From d4e8bc88385030bb654e562fb52bd1870907e35f Mon Sep 17 00:00:00 2001 From: Jeckelmann Manuel Date: Wed, 13 Jun 2018 22:07:24 +0200 Subject: [PATCH] Added a 'force_clone' flag, that allows for a decision on how to handle local ('file://') repositories --- test_all.py | 39 ++++++++++++++++++++++++++++++++++++++- truffleHog/truffleHog.py | 40 +++++++++++++++++++++++++++++++--------- 2 files changed, 69 insertions(+), 10 deletions(-) diff --git a/test_all.py b/test_all.py index 3fc2aad2694f..8a182669286d 100644 --- a/test_all.py +++ b/test_all.py @@ -1,5 +1,6 @@ import unittest import os +import re from truffleHog import truffleHog @@ -12,7 +13,7 @@ def test_shannon(self): self.assertGreater(truffleHog.shannon_entropy(random_stringHex, truffleHog.HEX_CHARS), 3) def test_cloning(self): - project_path = truffleHog.clone_git_repo("https://github.com/dxa4481/truffleHog.git") + project_path, _ = truffleHog.clone_git_repo("https://github.com/dxa4481/truffleHog.git") license_file = os.path.join(project_path, "LICENSE") self.assertTrue(os.path.isfile(license_file)) @@ -22,5 +23,41 @@ def test_unicode_expection(self): except UnicodeEncodeError: self.fail("Unicode print error") + +class TestRepoTypes(unittest.TestCase): + def test_file_repo(self): + # First, we'll clone the remote repo + git_url = "https://github.com/dxa4481/truffleHog.git" + project_path_1, c = truffleHog.clone_git_repo(git_url) + self.assertTrue(re.search(r'^/tmp/', project_path_1)) + + # Second, we'll use a local repo without cloning + project_path_2, c = truffleHog.clone_git_repo('file://' + project_path_1) + self.assertTrue(re.search(r'^/tmp/', project_path_2)) + self.assertEqual(project_path_1, project_path_2) + + # Third, we'll use a sloppy filepath as a project repo address + project_path_3, c = truffleHog.clone_git_repo(project_path_2) + self.assertTrue(re.search(r'^/tmp/', project_path_3)) + self.assertEqual(project_path_2, project_path_3) + + # Fourth, we'll force another clone from a local repo + project_path_4, c = truffleHog.clone_git_repo('file://' + project_path_3, force=True) + self.assertTrue(re.search(r'^/tmp/', project_path_4)) + self.assertNotEqual(project_path_3, project_path_4) + + def test_remove_only_temp_repos(self): + # First, we'll clone the remote repo + git_url = "https://github.com/dxa4481/truffleHog.git" + project_path, created = truffleHog.clone_git_repo(git_url) + self.assertTrue(re.search(r'^/tmp/', project_path)) + self.assertTrue(created) + + # Second, we'll use a local repo without cloning to find strigs + truffleHog.find_strings('file://' + project_path) + self.assertTrue(os.path.exists(project_path)) + + + if __name__ == '__main__': unittest.main() diff --git a/truffleHog/truffleHog.py b/truffleHog/truffleHog.py index 372480f342d1..1a305bc6bdea 100644 --- a/truffleHog/truffleHog.py +++ b/truffleHog/truffleHog.py @@ -27,6 +27,7 @@ def main(): parser.add_argument("--entropy", dest="do_entropy", help="Enable entropy checks") parser.add_argument("--since_commit", dest="since_commit", help="Only scan from a given commit hash") parser.add_argument("--max_depth", dest="max_depth", help="The max commit depth to go back when searching for secrets") + parser.add_argument("-f, --force_clone", action='store_true', dest="force_clone", help="Ensure the given git repository is cloned, even if it's already on disk (file://...); Remote repositories are always cloned;") parser.add_argument('git_url', type=str, help='URL for secret searching') parser.set_defaults(regex=False) parser.set_defaults(rules={}) @@ -48,9 +49,7 @@ def main(): for regex in rules: regexes[regex] = rules[regex] do_entropy = str2bool(args.do_entropy) - output = find_strings(args.git_url, args.since_commit, args.max_depth, args.output_json, args.do_regex, do_entropy, surpress_output=False) - project_path = output["project_path"] - shutil.rmtree(project_path, onerror=del_rw) + output = find_strings(args.git_url, args.since_commit, args.max_depth, args.output_json, args.do_regex, do_entropy, args.force_clone, surpress_output=False) if output["foundIssues"]: sys.exit(1) else: @@ -115,10 +114,28 @@ class bcolors: BOLD = '\033[1m' UNDERLINE = '\033[4m' -def clone_git_repo(git_url): - project_path = tempfile.mkdtemp() - Repo.clone_from(git_url, project_path) - return project_path +def clone_git_repo(git_url, force=False): + """ + Clone a git repo to a local temporary path; + SKip cloning if repo is addressed via file://... unless + the ``force`` flag is set. + """ + if '://' in git_url: + scheme, uri = git_url.split('://') + else: + scheme, uri = ('file', git_url,) + + # Reconstruct a proper URL to support sloppy filepaths + git_url = '{scheme}://{uri}'.format(scheme=scheme, uri=uri) + + project_path_created = True + if scheme == 'file' and not force: + project_path = uri + project_path_created = False + else: + project_path = tempfile.mkdtemp() + Repo.clone_from(git_url, project_path) + return project_path, project_path_created def print_results(printJson, issue): commit_time = issue['date'] @@ -240,9 +257,9 @@ def handle_results(output, output_dir, foundIssues): output["foundIssues"].append(result_path) return output -def find_strings(git_url, since_commit=None, max_depth=1000000, printJson=False, do_regex=False, do_entropy=True, surpress_output=True, custom_regexes={}): +def find_strings(git_url, since_commit=None, max_depth=1000000, printJson=False, do_regex=False, do_entropy=True, force_clone=False, surpress_output=True, custom_regexes={}): output = {"foundIssues": []} - project_path = clone_git_repo(git_url) + project_path, project_path_created = clone_git_repo(git_url, force=force_clone) repo = Repo(project_path) already_searched = set() output_dir = tempfile.mkdtemp() @@ -281,6 +298,11 @@ def find_strings(git_url, since_commit=None, max_depth=1000000, printJson=False, output = handle_results(output, output_dir, foundIssues) output["project_path"] = project_path output["clone_uri"] = git_url + + # Cleanup + if project_path_created: + shutil.rmtree(project_path, onerror=del_rw) + return output if __name__ == "__main__":