diff --git a/src/verinfast/agent.py b/src/verinfast/agent.py index 54cbf5e..5dfcf5f 100755 --- a/src/verinfast/agent.py +++ b/src/verinfast/agent.py @@ -1,4 +1,5 @@ #!/usr/bin/env python3 +from cachehash.main import Cache from datetime import date import json import os @@ -16,7 +17,7 @@ import semgrep.commands.scan as semgrep_scan import httpx -from jinja2 import Environment, FileSystemLoader +from jinja2 import Environment, FileSystemLoader, select_autoescape from pygments_tsx.tsx import patch_pygments from verinfast.utils.utils import DebugLog, std_exec, trimLineBreaks, escapeChars, truncate, truncate_children, get_repo_name_url_and_branch @@ -73,10 +74,21 @@ def __init__(self): self.config.upload_logs = initial_prompt() self.directory = save_path() + # Initialize cache + cache_dir = Path(Path.home(), '.verinfast_cache') + db_path = Path(cache_dir, 'semgrep.db') + if not db_path.parent.exists(): + db_path.parent.mkdir(parents=True, exist_ok=True) + + self.cache = Cache(db_path, "semgrep_cache") + def create_template(self): if not self.config.dry: with open(f"{self.config.output_dir}/results.html", "w") as f: - jinja_env = Environment(loader=FileSystemLoader(templates_folder)) + jinja_env = Environment( + loader=FileSystemLoader(templates_folder), + autoescape=select_autoescape(['html', 'xml']) + ) jinja_env.globals.update(zip=zip, sorted=sorted) output = jinja_env.get_template("results.j2").render(template_definition) f.write(output) @@ -433,40 +445,61 @@ def parseRepo(self, path: str, repo_name: str, branch: str = None): https://github.com/returntocorp/semgrep/issues/1330 """) - findings_output_file = os.path.join(self.config.output_dir, repo_name + ".findings.json") + findings_file = os.path.join( + self.config.output_dir, + f"{repo_name}.findings.json" + ) + + custom_args = [ + "--config", "auto", + "--json", + f"--json-output={findings_file}", + "-q" + ] + findings_success = False if not self.config.dry: self.log(msg=repo_name, tag="Scanning repository", display=True) + try: + with contextlib.redirect_stdout(io.StringIO()): + if not self.config.dry: + semgrep_scan.scan(custom_args) + if os.path.exists(findings_file): + with open(findings_file) as f: + results = json.load(f) + self.cache.set(findings_file, results) + findings_success = True + except SystemExit as e: + if e.code == 0: + findings_success = True + else: + self.log(tag="ERROR", msg="SystemExit in Semgrep") + self.log(e) + except Exception as e: + self.log(tag="ERROR", msg="Error in Semgrep") + self.log(e) + + # Only try to cache if scan was successful and file exists + if findings_success and os.path.exists(findings_file): try: - with open(findings_output_file, 'a') as o: - custom_args = [ - "--config", - "auto", - "--json", - f"--json-output={findings_output_file}", - "-q" - ] - try: - with contextlib.redirect_stdout(io.StringIO()): - semgrep_scan.scan(custom_args) - findings_success = True - except SystemExit as e: - if e.code == 0: - findings_success = True - else: - self.log(tag="ERROR", msg="SystemExit in Semgrep") - self.log(e) + # Try to cache the results + with open(findings_file) as f: + results = json.load(f) + self.cache.set(findings_file, results) except Exception as e: - self.log(tag="ERROR", msg="Error in Semgrep") - self.log(e) + self.log( + tag="Cache Error", + msg=f"Failed to cache results: {str(e)}" + ) + if findings_success: try: - with open(findings_output_file) as f: + with open(findings_file) as f: findings = json.load(f) # This is on purpose. If you try to read same pointer # twice, it dies. - with open(findings_output_file) as f: + with open(findings_file) as f: original_findings = json.load(f) if self.config.truncate_findings: @@ -505,41 +538,53 @@ def parseRepo(self, path: str, repo_name: str, branch: str = None): sort_keys=True ) ) - with open(findings_output_file, "w") as f2: + with open(findings_file, "w") as f2: f2.write(json.dumps( findings, indent=4, sort_keys=True )) template_definition["gitfindings"] = findings except Exception as e: if not self.config.dry: - self.log(tag="ERROR", msg="Error in findings post-processing") + self.log(tag="ERROR", + msg="Error in findings post-processing") self.log(e) else: self.log( msg=f''' Attempted to format/truncate non existent file - {findings_output_file} + {findings_file} ''' ) + # End if findings_success is True # Upload findings always, in case of dry run # .upload checks should_upload self.upload( - file=findings_output_file, + file=findings_file, route="findings", source=repo_name ) # ##### Scan Dependencies ###### if self.config.runDependencies: - dependencies_output_file = os.path.join(self.config.output_dir, repo_name + ".dependencies.json") + dependencies_output_file = os.path.join( + self.config.output_dir, + f"{repo_name}.dependencies.json" + ) self.log(msg=repo_name, tag="Scanning dependencies", display=True) if not self.config.dry: - dependencies_output_file = dependency_walk(output_file=dependencies_output_file, logger=self.log) + dependencies_output_file = dependency_walk( + output_file=dependencies_output_file, + logger=self.log + ) with open(dependencies_output_file, "r") as f: template_definition["dependencies"] = json.load(f) - self.log(msg=dependencies_output_file, tag="Dependency File", display=False) + self.log( + msg=dependencies_output_file, + tag="Dependency File", + display=False + ) self.upload( file=dependencies_output_file, route="dependencies", @@ -550,13 +595,14 @@ def preflight(self): # If the 'dry' configuration is set, skip the preflight checks if self.config.dry: return - # Loop over all remote repositories from config file - print("\n\n\nChecking your system's compatibility with the scan configuration:\n") + print("\n\n\nChecking your system's compatibility with the scan " + "configuration:\n") if 'repos' in self.config.config: repos = self.config.config["repos"] if repos: - for repo_url in [r for r in repos if len(r) > 0]: # ignore blank lines from server + # ignore blank lines from server + for repo_url in [r for r in repos if len(r) > 0]: match = re.search(r"([^/]*\.git.*)", repo_url) if match: repo_name = match.group(1) @@ -568,19 +614,34 @@ def preflight(self): repo_url = repo_url.split("@")[0] try: subprocess.check_output(["git", "ls-remote", repo_url]) - self.log(tag="Repository access confirmed", msg=repo_url, display=True, timestamp=False) + self.log(tag="Repository access confirmed", + msg=repo_url, + display=True, + timestamp=False) except subprocess.CalledProcessError: - self.log(msg=repo_url, tag="Unable to access", display=True, timestamp=False) - self.log(msg=repo_url, tag="Repository will not be scanned", display=True, timestamp=False) + self.log(msg=repo_url, + tag="Unable to access", + display=True, + timestamp=False) + self.log(msg=repo_url, + tag="Repository will not be scanned", + display=True, + timestamp=False) cloud_config = self.config.modules.cloud if cloud_config is not None: for provider in cloud_config: try: - if provider.provider == "aws" and self.checkDependency("aws", "AWS Command-line tool"): + if (provider.provider == "aws" and + self.checkDependency("aws", "AWS Command-line tool")): account_id = str(provider.account).replace('-', '') if find_profile(account_id, self.log) is None: - self.log(tag=f"No matching AWS CLI profiles found for {provider.account}", msg="Account can't be scanned.", display=True, timestamp=False) + self.log( + tag=f"No matching AWS CLI profiles found for {provider.account}", + msg="Account can't be scanned.", + display=True, + timestamp=False + ) else: self.log(tag="AWS account access confirmed", msg=account_id, display=True, timestamp=False) if provider.provider == "azure" and self.checkDependency("az", "Azure Command-line tool"): diff --git a/tests/test_cache.py b/tests/test_cache.py new file mode 100644 index 0000000..e3e1170 --- /dev/null +++ b/tests/test_cache.py @@ -0,0 +1,73 @@ +from pathlib import Path +import time +from verinfast.agent import Agent +from verinfast.config import Config + + +def setup_test_file(): + # Create a simple test file + test_content = """ +def test_function(): + password = "hardcoded_password" # This should trigger a semgrep finding + return password +""" + test_file = Path("test_sample.py") + with open(test_file, "w") as f: + f.write(test_content) + return test_file + + +def test_semgrep_cache(): + # Create test file + test_file = setup_test_file() + + # Minimal configuration + config = Config() + config.runGit = False + config.runSizes = False + config.runStats = False + config.runDependencies = False + config.config["local_repos"] = [str(test_file.parent.absolute())] + + try: + # First run + print("\nRunning first scan...") + start_time = time.time() + agent = Agent() + agent.config = config + agent.scan() + first_duration = time.time() - start_time + print(f"First scan took: {first_duration:.2f} seconds") + + # Second run should use cache + print("\nRunning second scan...") + start_time = time.time() + agent2 = Agent() + agent2.config = config + agent2.scan() + second_duration = time.time() - start_time + print(f"Second scan took: {second_duration:.2f} seconds") + + assert second_duration < first_duration # Second run should be faster + assert Path(Path.home(), '.verinfast_cache/semgrep.db').exists() + + finally: + # Cleanup + if test_file.exists(): + test_file.unlink() + + +def test_cache_persistence(): + cache_path = Path(Path.home(), '.verinfast_cache/semgrep.db') + + # First run creates cache + agent = Agent() + agent.config.runGit = False + agent.config.runSizes = False + agent.config.runStats = False + agent.config.runDependencies = False + agent.scan() + + # Verify cache exists and has content + assert cache_path.exists() + assert cache_path.stat().st_size > 0 diff --git a/tests/test_dry.py b/tests/test_dry.py index 666830a..3edc1b5 100644 --- a/tests/test_dry.py +++ b/tests/test_dry.py @@ -44,6 +44,7 @@ def test_no_config(self): # Check if there are any JSON files json_files = list(results_path.glob("*.json")) assert not json_files, f"Found JSON files: {json_files}" + # Since this test creates it's on DebubLog, the results # dir will have two log files. One is timestamped and is # the start of the regular log the the agent creates on start.