Skip to content

Commit

Permalink
Merge pull request #194 from Yelp/verifiable-secrets
Browse files Browse the repository at this point in the history
Verifiable secrets scaffolding
  • Loading branch information
domanchi authored Jun 21, 2019
2 parents a7daccc + 3622f28 commit 13575ed
Show file tree
Hide file tree
Showing 23 changed files with 585 additions and 42 deletions.
6 changes: 3 additions & 3 deletions detect_secrets/core/baseline.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def initialize(
plugins,
exclude_files_regex=None,
exclude_lines_regex=None,
scan_all_files=False,
should_scan_all_files=False,
):
"""Scans the entire codebase for secrets, and returns a
SecretsCollection object.
Expand All @@ -29,7 +29,7 @@ def initialize(
:type exclude_files_regex: str|None
:type exclude_lines_regex: str|None
:type path: list
:type scan_all_files: bool
:type should_scan_all_files: bool
:rtype: SecretsCollection
"""
Expand All @@ -42,7 +42,7 @@ def initialize(
files_to_scan = []
for element in path:
if os.path.isdir(element):
if scan_all_files:
if should_scan_all_files:
files_to_scan.extend(_get_files_recursively(element))
else:
files = _get_git_tracked_files(element)
Expand Down
9 changes: 9 additions & 0 deletions detect_secrets/core/constants.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from enum import Enum


# We don't scan files with these extensions.
# NOTE: We might be able to do this better with
# `subprocess.check_output(['file', filename])`
Expand Down Expand Up @@ -26,3 +29,9 @@
'webp',
'zip',
}


class VerifiedResult(Enum):
UNVERIFIED = 1
VERIFIED_FALSE = 2
VERIFIED_TRUE = 3
15 changes: 15 additions & 0 deletions detect_secrets/core/potential_secret.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,26 @@ def __init__(
:type is_secret: bool|None
:param is_secret: whether or not the secret is a true- or false- positive
:type is_verified: bool
:param is_verified: whether the secret has been externally verified
"""
self.type = typ
self.filename = filename
self.lineno = lineno
self.secret_hash = self.hash_secret(secret)
self.is_secret = is_secret
self.is_verified = False

# NOTE: Originally, we never wanted to keep the secret value in memory,
# after finding it in the codebase. However, to support verifiable
# secrets (and avoid the pain of re-scanning again), we need to
# keep the plaintext in memory as such.
#
# This value should never appear in the baseline though, seeing that
# we don't want to create a file that contains all plaintext secrets
# in the repository.
self.secret_value = secret

# If two PotentialSecrets have the same values for these fields,
# they are considered equal. Note that line numbers aren't included
Expand All @@ -69,6 +83,7 @@ def json(self):
'filename': self.filename,
'line_number': self.lineno,
'hashed_secret': self.secret_hash,
'is_verified': self.is_verified,
}

if self.is_secret is not None:
Expand Down
13 changes: 8 additions & 5 deletions detect_secrets/core/secrets_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,14 @@ def load_baseline_from_dict(cls, data):
plugins = []
for plugin in data['plugins_used']:
plugin_classname = plugin.pop('name')
plugins.append(initialize.from_plugin_classname(
plugin_classname,
exclude_lines_regex=result.exclude_lines,
**plugin
))
plugins.append(
initialize.from_plugin_classname(
plugin_classname,
exclude_lines_regex=result.exclude_lines,
should_verify_secrets=False,
**plugin
),
)
result.plugins = tuple(plugins)

for filename in data['results']:
Expand Down
19 changes: 18 additions & 1 deletion detect_secrets/core/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ def add_use_all_plugins_argument(parser):
)


def add_no_verify_flag(parser):
parser.add_argument(
'-n',
'--no-verify',
action='store_true',
help='Disables additional verification of secrets via network call.',
)


class ParserBuilder(object):

def __init__(self):
Expand All @@ -37,7 +46,8 @@ def add_pre_commit_arguments(self):
self._add_filenames_argument()\
._add_set_baseline_argument()\
._add_exclude_lines_argument()\
._add_use_all_plugins_argument()
._add_use_all_plugins_argument()\
._add_no_verify_flag()

PluginOptions(self.parser).add_arguments()

Expand Down Expand Up @@ -100,6 +110,11 @@ def _add_exclude_lines_argument(self):

def _add_use_all_plugins_argument(self):
add_use_all_plugins_argument(self.parser)
return self

def _add_no_verify_flag(self):
add_no_verify_flag(self.parser)
return self


class ScanOptions(object):
Expand Down Expand Up @@ -161,6 +176,8 @@ def _add_initialize_baseline_argument(self):
help='Scan all files recursively (as compared to only scanning git tracked files).',
)

add_no_verify_flag(self.parser)

return self

def _add_adhoc_scanning_argument(self):
Expand Down
4 changes: 2 additions & 2 deletions detect_secrets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def main(argv=None):
plugins = initialize.from_parser_builder(
args.plugins,
exclude_lines_regex=args.exclude_lines,
should_verify_secrets=not args.no_verify,
)
if args.string:
line = args.string
Expand Down Expand Up @@ -115,7 +116,6 @@ def _perform_scan(args, plugins):
:rtype: dict
"""

old_baseline = _get_existing_baseline(args.import_filename)
if old_baseline:
plugins = initialize.merge_plugin_from_baseline(
Expand Down Expand Up @@ -144,7 +144,7 @@ def _perform_scan(args, plugins):
exclude_files_regex=args.exclude_files,
exclude_lines_regex=args.exclude_lines,
path=args.path,
scan_all_files=args.all_files,
should_scan_all_files=args.all_files,
).format_for_baseline_output()

if old_baseline:
Expand Down
167 changes: 167 additions & 0 deletions detect_secrets/plugins/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@
"""
from __future__ import absolute_import

import hashlib
import hmac
import re
import string
import textwrap
from datetime import datetime

import requests

from .base import RegexBasedDetector
from detect_secrets.core.constants import VerifiedResult


class AWSKeyDetector(RegexBasedDetector):
Expand All @@ -15,3 +23,162 @@ class AWSKeyDetector(RegexBasedDetector):
denylist = (
re.compile(r'AKIA[0-9A-Z]{16}'),
)

def verify(self, token, content):
secret_access_key = get_secret_access_key(content)
if not secret_access_key:
return VerifiedResult.UNVERIFIED

for candidate in secret_access_key:
if verify_aws_secret_access_key(token, candidate):
return VerifiedResult.VERIFIED_TRUE

return VerifiedResult.VERIFIED_FALSE


def get_secret_access_key(content):
# AWS secret access keys are 40 characters long.
regex = re.compile(
r'= *([\'"]?)([%s]{40})(\1)$' % (
string.ascii_letters + string.digits + '+/='
),
)

return [
match[1]
for line in content.splitlines()
for match in regex.findall(line)
]


def verify_aws_secret_access_key(key, secret): # pragma: no cover
"""
Using requests, because we don't want to require boto3 for this one
optional verification step.
Loosely based off:
https://docs.aws.amazon.com/general/latest/gr/sigv4-signed-request-examples.html
:type key: str
:type secret: str
"""
now = datetime.utcnow()
amazon_datetime = now.strftime('%Y%m%dT%H%M%SZ')

headers = {
# This is a required header for the signing process
'Host': 'sts.amazonaws.com',
'X-Amz-Date': amazon_datetime,
}
body = {
'Action': 'GetCallerIdentity',
'Version': '2011-06-15',
}

# Step #1: Canonical Request
signed_headers = ';'.join(
map(
lambda x: x.lower(),
headers.keys(),
),
)
canonical_request = textwrap.dedent("""
POST
/
{headers}
{signed_headers}
{hashed_payload}
""")[1:-1].format(

headers='\n'.join([
'{}:{}'.format(header.lower(), value)
for header, value in headers.items()
]),
signed_headers=signed_headers,

# Poor man's method, but works for this use case.
hashed_payload=hashlib.sha256(
'&'.join([
'{}={}'.format(header, value)
for header, value in body.items()
]).encode('utf-8'),
).hexdigest(),
)

# Step #2: String to Sign
region = 'us-east-1'
scope = '{request_date}/{region}/sts/aws4_request'.format(
request_date=now.strftime('%Y%m%d'),

# STS is a global service; this is just for latency control.
region=region,
)

string_to_sign = textwrap.dedent("""
AWS4-HMAC-SHA256
{request_datetime}
{scope}
{hashed_canonical_request}
""")[1:-1].format(
request_datetime=amazon_datetime,
scope=scope,
hashed_canonical_request=hashlib.sha256(
canonical_request.encode('utf-8'),
).hexdigest(),
)

# Step #3: Calculate signature
signing_key = _sign(
_sign(
_sign(
_sign(
'AWS4{}'.format(secret).encode('utf-8'),
now.strftime('%Y%m%d'),
),
region,
),
'sts',
),
'aws4_request',
)

signature = _sign(
signing_key,
string_to_sign,
hex=True,
)

# Step #4: Add to request headers
headers['Authorization'] = (
'AWS4-HMAC-SHA256 '
'Credential={access_key}/{scope}, '
'SignedHeaders={signed_headers}, '
'Signature={signature}'
).format(
access_key=key,
scope=scope,
signed_headers=signed_headers,
signature=signature,
)

# Step #5: Finally send the request
response = requests.post(
'https://sts.amazonaws.com',
headers=headers,
data=body,
)

if response.status_code == 403:
return False

return True


def _sign(key, message, hex=False): # pragma: no cover
value = hmac.new(key, message.encode('utf-8'), hashlib.sha256)
if not hex:
return value.digest()

return value.hexdigest()
Loading

0 comments on commit 13575ed

Please sign in to comment.