Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add plugin for IBM Cloud Object Storage HMAC #263

Merged
merged 8 commits into from
Dec 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 163 additions & 0 deletions detect_secrets/plugins/ibm_cos_hmac.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
from __future__ import absolute_import

import datetime
import hashlib
import hmac

import requests

from .base import RegexBasedDetector
from detect_secrets.core.constants import VerifiedResult


class IbmCosHmacDetector(RegexBasedDetector):
"""Scans for IBM Cloud Object Storage HMAC credentials."""
# requires 3 factors
#
# access_key: access_key_id
# secret_key: secret_access_key
# host, defaults to 's3.us.cloud-object-storage.appdomain.cloud'

secret_type = 'IBM COS HMAC Credentials'

token_prefix = r'(?:(?:ibm)?[-_]?cos[-_]?(?:hmac)?|)'
password_keyword = r'(?:secret[-_]?(?:access)?[-_]?key)'
password = r'([a-f0-9]{48})'
denylist = (
RegexBasedDetector.assign_regex_generator(
prefix_regex=token_prefix,
secret_keyword_regex=password_keyword,
secret_regex=password,
),
)

def verify(self, token, content):
key_id_matches = find_access_key_id(content)

if not key_id_matches:
return VerifiedResult.UNVERIFIED

try:
for key_id in key_id_matches:
verify_result = verify_ibm_cos_hmac_credentials(
key_id, token,
)
if verify_result:
return VerifiedResult.VERIFIED_TRUE
except requests.exceptions.RequestException:
return VerifiedResult.UNVERIFIED

return VerifiedResult.VERIFIED_FALSE


def find_access_key_id(content):
key_id_keyword_regex = r'(?:access[-_]?(?:key)?[-_]?(?:id)?|key[-_]?id)'
key_id_regex = r'([a-f0-9]{32})'

regex = RegexBasedDetector.assign_regex_generator(
prefix_regex=IbmCosHmacDetector.token_prefix,
secret_keyword_regex=key_id_keyword_regex,
secret_regex=key_id_regex,
)

return [
match
for line in content.splitlines()
for match in regex.findall(line)
]


def hash(key, msg):
return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()


def createSignatureKey(key, datestamp, region, service):

keyDate = hash(('AWS4' + key).encode('utf-8'), datestamp)
keyString = hash(keyDate, region)
keyService = hash(keyString, service)
keySigning = hash(keyService, 'aws4_request')
return keySigning


def verify_ibm_cos_hmac_credentials(
access_key,
secret_key,
host='s3.us.cloud-object-storage.appdomain.cloud',
):
response = query_ibm_cos_hmac(access_key, secret_key, host)
return response.status_code == 200


def query_ibm_cos_hmac(
access_key,
secret_key,
host='s3.us.cloud-object-storage.appdomain.cloud',
):
# Sample code referenced from link below
# https://cloud.ibm.com/docs/services/cloud-object-storage/api-reference?topic=cloud-object-storage-hmac-signature # noqa: E501

# request elements
http_method = 'GET'
# region is a wildcard value that takes the place of the AWS region value
# as COS doen't use the same conventions for regions, this parameter can accept any string
region = 'us-standard'
endpoint = 'https://{}'.format(host)
bucket = '' # add a '/' before the bucket name to list buckets
object_key = ''
request_parameters = ''

# assemble the standardized request
time = datetime.datetime.utcnow()
timestamp = time.strftime('%Y%m%dT%H%M%SZ')
datestamp = time.strftime('%Y%m%d')

standardized_resource = '/' + bucket + '/' + object_key
standardized_querystring = request_parameters
standardized_headers = 'host:' + host + '\n' + 'x-amz-date:' + timestamp + '\n'
signed_headers = 'host;x-amz-date'
payload_hash = hashlib.sha256(''.encode('utf-8')).hexdigest()

standardized_request = (
http_method + '\n'
+ standardized_resource + '\n'
+ standardized_querystring + '\n'
+ standardized_headers + '\n'
+ signed_headers + '\n'
+ payload_hash
).encode('utf-8')

# assemble string-to-sign
hashing_algorithm = 'AWS4-HMAC-SHA256'
credential_scope = datestamp + '/' + region + '/' + 's3' + '/' + 'aws4_request'
sts = (
hashing_algorithm + '\n'
+ timestamp + '\n'
+ credential_scope + '\n'
+ hashlib.sha256(standardized_request).hexdigest()
)

# generate the signature
signature_key = createSignatureKey(secret_key, datestamp, region, 's3')
signature = hmac.new(
signature_key,
(sts).encode('utf-8'),
hashlib.sha256,
).hexdigest()

# assemble all elements into the 'authorization' header
v4auth_header = (
hashing_algorithm + ' '
+ 'Credential=' + access_key + '/' + credential_scope + ', '
+ 'SignedHeaders=' + signed_headers + ', '
+ 'Signature=' + signature
)

# create and send the request
headers = {'x-amz-date': timestamp, 'Authorization': v4auth_header}
# the 'requests' package autmatically adds the required 'host' header
request_url = endpoint + standardized_resource + standardized_querystring

request = requests.get(request_url, headers=headers)

return request
173 changes: 173 additions & 0 deletions tests/plugins/ibm_cos_hmac_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
from __future__ import absolute_import

import textwrap

import pytest
import requests
import responses
from mock import patch

from detect_secrets.core.constants import VerifiedResult
from detect_secrets.plugins.ibm_cos_hmac import find_access_key_id
from detect_secrets.plugins.ibm_cos_hmac import IbmCosHmacDetector
from detect_secrets.plugins.ibm_cos_hmac import verify_ibm_cos_hmac_credentials


ACCESS_KEY_ID = '1234567890abcdef1234567890abcdef'
SECRET_ACCESS_KEY = '1234567890abcdef1234567890abcdef1234567890abcdef'


class TestIbmCosHmacDetector(object):

@pytest.mark.parametrize(
'payload, should_flag',
[
('"secret_access_key": "1234567890abcdef1234567890abcdef1234567890abcdef"', True),
('secret_access_key=1234567890abcdef1234567890abcdef1234567890abcdef', True),
('secret_access_key="1234567890abcdef1234567890abcdef1234567890abcdef"', True),
('secret_access_key=\'1234567890abcdef1234567890abcdef1234567890abcdef\'', True),
('secret_access_key = "1234567890abcdef1234567890abcdef1234567890abcdef"', True),
(
'COS_HMAC_SECRET_ACCESS_KEY = "1234567890abcdef1234567890abcdef1234567890abcdef"',
True,
),
(
'ibm_cos_SECRET_ACCESS_KEY = "1234567890abcdef1234567890abcdef1234567890abcdef"',
True,
),
(
'ibm_cos_secret_access_key = "1234567890abcdef1234567890abcdef1234567890abcdef"',
True,
),
('ibm_cos_secret_key = "1234567890abcdef1234567890abcdef1234567890abcdef"', True),
('cos_secret_key = "1234567890abcdef1234567890abcdef1234567890abcdef"', True),
('ibm-cos_secret_key = "1234567890abcdef1234567890abcdef1234567890abcdef"', True),
('cos-hmac_secret_key = "1234567890abcdef1234567890abcdef1234567890abcdef"', True),
('coshmac_secret_key = "1234567890abcdef1234567890abcdef1234567890abcdef"', True),
('ibmcoshmac_secret_key = "1234567890abcdef1234567890abcdef1234567890abcdef"', True),
('ibmcos_secret_key = "1234567890abcdef1234567890abcdef1234567890abcdef"', True),
('not_secret = notapassword', False),
('someotherpassword = "doesnt start right"', False),
],
)
def test_analyze_string(self, payload, should_flag):
logic = IbmCosHmacDetector()

output = logic.analyze_line(payload, 1, 'mock_filename')
assert len(output) == int(should_flag)

@patch('detect_secrets.plugins.ibm_cos_hmac.verify_ibm_cos_hmac_credentials')
def test_verify_invalid_secret(self, mock_hmac_verify):
mock_hmac_verify.return_value = False

assert IbmCosHmacDetector().verify(
SECRET_ACCESS_KEY,
'''access_key_id={}'''.format(ACCESS_KEY_ID),
) == VerifiedResult.VERIFIED_FALSE

mock_hmac_verify.assert_called_with(ACCESS_KEY_ID, SECRET_ACCESS_KEY)

@patch('detect_secrets.plugins.ibm_cos_hmac.verify_ibm_cos_hmac_credentials')
def test_verify_valid_secret(self, mock_hmac_verify):
mock_hmac_verify.return_value = True

assert IbmCosHmacDetector().verify(
SECRET_ACCESS_KEY,
'''access_key_id={}'''.format(ACCESS_KEY_ID),
) == VerifiedResult.VERIFIED_TRUE

mock_hmac_verify.assert_called_with(ACCESS_KEY_ID, SECRET_ACCESS_KEY)

@patch('detect_secrets.plugins.ibm_cos_hmac.verify_ibm_cos_hmac_credentials')
def test_verify_unverified_secret(self, mock_hmac_verify):
mock_hmac_verify.side_effect = requests.exceptions.RequestException('oops')

assert IbmCosHmacDetector().verify(
SECRET_ACCESS_KEY,
'''access_key_id={}'''.format(ACCESS_KEY_ID),
) == VerifiedResult.UNVERIFIED

mock_hmac_verify.assert_called_with(ACCESS_KEY_ID, SECRET_ACCESS_KEY)

@patch('detect_secrets.plugins.ibm_cos_hmac.verify_ibm_cos_hmac_credentials')
def test_verify_unverified_secret_no_match(self, mock_hmac_verify):
mock_hmac_verify.side_effect = requests.exceptions.RequestException('oops')

assert IbmCosHmacDetector().verify(
SECRET_ACCESS_KEY,
'''something={}'''.format(ACCESS_KEY_ID),
) == VerifiedResult.UNVERIFIED

mock_hmac_verify.assert_not_called()

@pytest.mark.parametrize(
'content, expected_output',
(
(
textwrap.dedent("""
access_key_id = {}
""")[1:-1].format(
ACCESS_KEY_ID,
),
[ACCESS_KEY_ID],
),
(
'access_key_id = {}'.format(ACCESS_KEY_ID),
[ACCESS_KEY_ID],
),
(
'access-key-id := {}'.format(ACCESS_KEY_ID),
[ACCESS_KEY_ID],
),
(
"\"access_id\":\"{}\"".format(ACCESS_KEY_ID),
[ACCESS_KEY_ID],
),
(
"key_id = \"{}\"".format(ACCESS_KEY_ID),
[ACCESS_KEY_ID],
),
(
"key-id = '{}'".format(ACCESS_KEY_ID),
[ACCESS_KEY_ID],
),
(
"access_key = '{}'".format(ACCESS_KEY_ID),
[ACCESS_KEY_ID],
),
(
"[\"access_key_id\"] = '{}'".format(ACCESS_KEY_ID),
[ACCESS_KEY_ID],
),
(
'id = {}'.format(ACCESS_KEY_ID),
[],
),
),
)
def test_find_access_key_id(self, content, expected_output):
assert find_access_key_id(content) == expected_output


@pytest.mark.parametrize(
'status_code, validation_result',
[
(200, True),
(403, False),
],
)
@responses.activate
def test_verify_ibm_cos_hmac_credentials(status_code, validation_result):
host = 'fake.s3.us.cloud-object-storage.appdomain.cloud'
responses.add(
responses.GET, 'https://{}//'.format(host),
json={'some': 'thing'}, status=status_code,
)

assert verify_ibm_cos_hmac_credentials(
ACCESS_KEY_ID, SECRET_ACCESS_KEY, host,
) is validation_result
assert len(responses.calls) == 1
headers = responses.calls[0].request.headers
assert headers['Authorization'].startswith('AWS4-HMAC-SHA256')
assert headers['x-amz-date'] is not None