Skip to content

Commit

Permalink
Merge pull request #100 from obsidianforensics/TransportSecurity
Browse files Browse the repository at this point in the history
Transport security
  • Loading branch information
obsidianforensics authored Sep 20, 2021
2 parents 22fbb34 + da9d633 commit b01e980
Show file tree
Hide file tree
Showing 3 changed files with 187 additions and 18 deletions.
31 changes: 30 additions & 1 deletion pyhindsight/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def default(self, obj):

if item.get('source_title'):
item['message'] = f"Watched{item['watch_time']} on {item['source_title']} "\
f"(ending at {item['position']}/{item['media_duration']}) " \
f"(ending at {item['position']}/{item.get('media_duration')}) " \
f"[has_video: {item['has_video']}; has_audio: {item['has_audio']}]"
else:
item['message'] = f"Watched{item['watch_time']} on {item['url']} " \
Expand Down Expand Up @@ -212,6 +212,19 @@ def default(self, obj):
del(item['row_type'], item['name'])
return item

if isinstance(obj, Chrome.SiteSetting):
item = HindsightEncoder.base_encoder(obj)

item['timestamp_desc'] = 'Update Time'
item['data_type'] = 'chrome:site_setting:entry'
item['source_long'] = 'Chrome Site Settings'

item['message'] = 'Updated site setting: {}: {})'.format(
item['key'], item['value'])

del(item['row_type'], item['name'])
return item

if isinstance(obj, CacheEntry):
item = HindsightEncoder.base_encoder(obj)

Expand Down Expand Up @@ -827,6 +840,15 @@ def generate_excel(self, output_object):
w.write(row_number, 5, item.interpretation, blue_value_format) # interpretation
w.write(row_number, 6, item.profile, blue_value_format) # Profile

elif item.row_type.startswith("site setting"):
w.write_string(row_number, 0, item.row_type, blue_type_format) # record_type
w.write(row_number, 1, friendly_date(item.timestamp), blue_date_format) # date
w.write_string(row_number, 2, item.url, blue_url_format) # URL
w.write_string(row_number, 3, item.name, blue_field_format) # form field name
w.write_string(row_number, 4, item.value, blue_value_format) # username or pw value
w.write(row_number, 5, item.interpretation, blue_value_format) # interpretation
w.write(row_number, 6, item.profile, blue_value_format) # Profile

if friendly_date(item.timestamp) < '1970-01-02':
w.set_row(row_number, options={'hidden': True})

Expand Down Expand Up @@ -1094,6 +1116,13 @@ def generate_sqlite(self, output_file_path='.temp_db'):
(item.row_type, friendly_date(item.timestamp), item.url, item.name, item.value,
item.interpretation, item.profile))

elif item.row_type.startswith('site setting'):
c.execute(
'INSERT INTO timeline (type, timestamp, url, title, value, interpretation, profile) '
'VALUES (?, ?, ?, ?, ?, ?, ?)',
(item.row_type, friendly_date(item.timestamp), item.url, item.name, item.value,
item.interpretation, item.profile))

for item in self.parsed_storage:
if item.row_type.startswith('local'):
c.execute(
Expand Down
161 changes: 145 additions & 16 deletions pyhindsight/browsers/chrome.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import hashlib
import os
import sys
import errno
Expand All @@ -9,6 +10,8 @@
import logging
import shutil
import puremagic
import urllib
import base64
from pyhindsight.browsers.webbrowser import WebBrowser
from pyhindsight import utils

Expand Down Expand Up @@ -36,11 +39,12 @@ class Chrome(WebBrowser):
def __init__(self, profile_path, browser_name=None, cache_path=None, version=None, timezone=None,
parsed_artifacts=None, parsed_storage=None, storage=None, installed_extensions=None,
artifacts_counts=None, artifacts_display=None, available_decrypts=None, preferences=None,
no_copy=None, temp_dir=None, origin_hashes=None):
WebBrowser.__init__(self, profile_path, browser_name=browser_name, cache_path=cache_path, version=version,
timezone=timezone, parsed_artifacts=parsed_artifacts, parsed_storage=parsed_storage,
artifacts_counts=artifacts_counts, artifacts_display=artifacts_display,
preferences=preferences, no_copy=no_copy, temp_dir=temp_dir, origin_hashes=origin_hashes)
no_copy=None, temp_dir=None, origin_hashes=None, hsts_hashes=None):
WebBrowser.__init__(
self, profile_path, browser_name=browser_name, cache_path=cache_path, version=version, timezone=timezone,
parsed_artifacts=parsed_artifacts, parsed_storage=parsed_storage, artifacts_counts=artifacts_counts,
artifacts_display=artifacts_display, preferences=preferences, no_copy=no_copy, temp_dir=temp_dir,
origin_hashes=origin_hashes)
self.profile_path = profile_path
self.browser_name = "Chrome"
self.cache_path = cache_path
Expand All @@ -53,6 +57,7 @@ def __init__(self, profile_path, browser_name=None, cache_path=None, version=Non
self.no_copy = no_copy
self.temp_dir = temp_dir
self.origin_hashes = origin_hashes
self.hsts_hashes = hsts_hashes

if self.version is None:
self.version = []
Expand All @@ -75,6 +80,9 @@ def __init__(self, profile_path, browser_name=None, cache_path=None, version=Non
if self.origin_hashes is None:
self.origin_hashes = {}

if self.hsts_hashes is None:
self.hsts_hashes = {}

if self.artifacts_counts is None:
self.artifacts_counts = {}

Expand Down Expand Up @@ -1331,11 +1339,13 @@ def expand_language_code(code):
append_pref(host, config)
elif isinstance(config, dict):
append_pref(host, config['zoom_level'])
timestamped_preference_items.append(Chrome.PreferenceItem(
timestamped_preference_item = Chrome.SiteSetting(
self.profile_path, url=host,
timestamp=utils.to_datetime(config['last_modified'], self.timezone),
key=f'per_host_zoom_levels [in {preferences_file}.partition]',
value=f'Changed zoom level to {config["zoom_level"]}', interpretation=''))
value=f'Changed zoom level to {config["zoom_level"]}', interpretation='')
timestamped_preference_item.row_type += ' (zoom level)'
timestamped_preference_items.append(timestamped_preference_item)
except Exception as e:
log.exception(f' - Exception parsing Preference item: {e})')

Expand Down Expand Up @@ -1366,11 +1376,12 @@ def expand_language_code(code):
for origin, pref_data in \
prefs['profile']['content_settings']['exceptions']['media_engagement'].items():
if pref_data.get('last_modified'):
pref_item = Chrome.PreferenceItem(
pref_item = Chrome.SiteSetting(
self.profile_path, url=origin,
timestamp=utils.to_datetime(pref_data['last_modified'], self.timezone),
key=f'media_engagement [in {preferences_file}.profile.content_settings.exceptions]',
value=str(pref_data), interpretation='')
pref_item.row_type += ' (engagement)'
timestamped_preference_items.append(pref_item)
except Exception as e:
log.exception(f' - Exception parsing Preference item: {e})')
Expand All @@ -1385,11 +1396,12 @@ def expand_language_code(code):
for origin, pref_data in \
prefs['profile']['content_settings']['exceptions']['notifications'].items():
if pref_data.get('last_modified'):
pref_item = Chrome.PreferenceItem(
pref_item = Chrome.SiteSetting(
self.profile_path, url=origin,
timestamp=utils.to_datetime(pref_data['last_modified'], self.timezone),
key=f'notifications [in {preferences_file}.profile.content_settings.exceptions]',
value=str(pref_data), interpretation='')
pref_item.row_type += ' (engagement)'
timestamped_preference_items.append(pref_item)
except Exception as e:
log.exception(f' - Exception parsing Preference item: {e})')
Expand All @@ -1406,11 +1418,12 @@ def expand_language_code(code):
for origin, pref_data in \
prefs['profile']['content_settings']['exceptions']['permission_autoblocking_data'].items():
if pref_data.get('last_modified') and pref_data.get('last_modified') != '0':
pref_item = Chrome.PreferenceItem(
pref_item = Chrome.SiteSetting(
self.profile_path, url=origin,
timestamp=utils.to_datetime(pref_data['last_modified'], self.timezone),
key=f'permission_autoblocking_data [in {preferences_file}.profile.content_settings.exceptions]',
value=str(pref_data), interpretation='')
pref_item.row_type += ' (engagement)'
timestamped_preference_items.append(pref_item)
except Exception as e:
log.exception(f' - Exception parsing Preference item: {e})')
Expand All @@ -1429,11 +1442,12 @@ def expand_language_code(code):
for origin, pref_data in \
prefs['profile']['content_settings']['exceptions']['site_engagement'].items():
if pref_data.get('last_modified'):
pref_item = Chrome.PreferenceItem(
pref_item = Chrome.SiteSetting(
self.profile_path, url=origin,
timestamp=utils.to_datetime(pref_data['last_modified'], self.timezone),
key=f'site_engagement [in {preferences_file}.profile.content_settings.exceptions]',
value=str(pref_data), interpretation='')
pref_item.row_type += ' (engagement)'
timestamped_preference_items.append(pref_item)
except Exception as e:
log.exception(f' - Exception parsing Preference item: {e})')
Expand All @@ -1451,11 +1465,12 @@ def expand_language_code(code):
interpretation = ''
if pref_data.get('setting') == 2:
interpretation = 'Muted site'
pref_item = Chrome.PreferenceItem(
pref_item = Chrome.SiteSetting(
self.profile_path, url=origin,
timestamp=utils.to_datetime(pref_data['last_modified'], self.timezone),
key=f'sound [in {preferences_file}.profile.content_settings.exceptions]',
value=str(pref_data), interpretation=interpretation)
pref_item.row_type += ' (engagement)'
timestamped_preference_items.append(pref_item)
except Exception as e:
log.exception(f' - Exception parsing Preference item: {e})')
Expand Down Expand Up @@ -1989,7 +2004,7 @@ def get_file_system(self, path, dir_name):
def get_site_characteristics(self, path, dir_name):
result_list = []

self.build_hash_list_of_origins()
self.build_md5_hash_list_of_origins()

log.info('Site Characteristics:')
sc_root_path = os.path.join(path, dir_name)
Expand Down Expand Up @@ -2021,9 +2036,12 @@ def get_site_characteristics(self, path, dir_name):
last_loaded = 0

matched_url = self.origin_hashes.get(item['key'].decode(), f'MD5 of origin: {item["key"].decode()}')
result_list.append(Chrome.PreferenceItem(

sc_record = Chrome.SiteSetting(
self.profile_path, url=matched_url, timestamp=utils.to_datetime(last_loaded, self.timezone),
key=f'Status: {item["state"]}', value=str(parsed_proto), interpretation=''))
key=f'Status: {item["state"]}', value=str(parsed_proto), interpretation='')
sc_record.row_type += ' (characteristic)'
result_list.append(sc_record)

except Exception as e:
log.exception(f' - Exception parsing SiteDataProto ({item}): {e}')
Expand All @@ -2032,11 +2050,115 @@ def get_site_characteristics(self, path, dir_name):
self.artifacts_counts['Site Characteristics'] = len(result_list)
self.parsed_artifacts.extend(result_list)

def build_hsts_domain_hashes(self):
domains = set()
for artifact in self.parsed_artifacts:
if isinstance(artifact, self.HistoryItem):
domain = urllib.parse.urlparse(artifact.url).hostname
# Some URLs don't have a domain, like local PDF files
if domain:
domains.add(domain)

for domain in domains:

# From https://source.chromium.org/chromium/chromium/src/+
# /main:net/http/transport_security_state.cc;l=223:
# Converts |hostname| from dotted form ("www.google.com") to the form
# used in DNS: "\x03www\x06google\x03com", lowercases that, and returns
# the result.
domain_parts = domain.lower().split('.')
while len(domain_parts) > 1:
dns_hostname = ''
for domain_part in domain_parts:
dns_hostname += f'{chr(len(domain_part))}{domain_part}'
dns_hostname += chr(0)

# From https://source.chromium.org/chromium/chromium/src/+
# /main:net/http/transport_security_persister.h;l=103:
# The JSON dictionary keys are strings containing
# Base64(SHA256(TransportSecurityState::CanonicalizeHost(domain))).
hashed_domain = base64.b64encode(
hashlib.sha256(dns_hostname.encode()).digest()).decode('utf-8')

# Check if this is new hash (break if not), add it to the dict,
# and then repeat with the leading domain part removed.
if hashed_domain in self.hsts_hashes:
break
self.hsts_hashes[hashed_domain] = '.'.join(domain_parts)
domain_parts = domain_parts[1:]

def get_transport_security(self, path, dir_name):
result_list = []

# Use the URLs from other previously-processed artifacts to generate hashes of domains
# in the form Chrome uses as the 'host' identifier.
self.build_hsts_domain_hashes()

log.info('Transport Security (HSTS):')
ts_file_path = os.path.join(path, dir_name)
log.info(f' - Reading from {ts_file_path}')

# From https://source.chromium.org/chromium/chromium/src/+
# /main:net/http/transport_security_persister.h;l=103:
# The JSON dictionary keys are strings containing
# Base64(SHA256(TransportSecurityState::CanonicalizeHost(domain))).
# The reason for hashing them is so that the stored state does not
# trivially reveal a user's browsing history to an attacker reading the
# serialized state on disk.

with open(ts_file_path, encoding='utf-8', errors='replace') as f:
ts_json = json.loads(f.read())

# As of now (2021), there are two versions of the TransportSecurity JSON file.
# Version 2 has a top level "version" key (with a value of 2), and version 1
# has the HSTS domain hashes as top level keys.

# Version 2
if ts_json.get('version'):
assert ts_json['version'] == 2, '"2" is only supported value for "version"'
hsts = ts_json['sts']

for item in hsts:
if item['host'] in self.hsts_hashes:
hsts_domain = self.hsts_hashes[item['host']]
else:
hsts_domain = f'Encoded domain: {item["host"]}'

hsts_record = Chrome.SiteSetting(
self.profile_path, url=hsts_domain,
timestamp=utils.to_datetime(item['sts_observed'], self.timezone),
key='HSTS observed', value=str(item), interpretation='')
hsts_record.row_type += ' (hsts)'
result_list.append(hsts_record)

# Version 1
elif len(ts_json):
for hashed_domain, domain_settings in ts_json.items():
if hashed_domain in self.hsts_hashes:
hsts_domain = self.hsts_hashes[hashed_domain]
else:
hsts_domain = f'{hashed_domain} (encoded domain)'

hsts_record = Chrome.SiteSetting(
self.profile_path, url=hsts_domain,
timestamp=utils.to_datetime(domain_settings['sts_observed'], self.timezone),
key='HSTS observed', value=f'{hashed_domain}: {domain_settings}', interpretation='')
hsts_record.row_type += ' (hsts)'
result_list.append(hsts_record)

else:
log.warning('Unable to process TransportSecurity file; could not determine version.')
return

log.info(f' - Parsed {len(result_list)} items')
self.artifacts_counts['HSTS'] = len(result_list)
self.parsed_artifacts.extend(result_list)

def process(self):
supported_databases = ['History', 'Archived History', 'Media History', 'Web Data', 'Cookies', 'Login Data',
'Extension Cookies']
supported_subdirs = ['Local Storage', 'Extensions', 'File System', 'Platform Notifications']
supported_jsons = ['Bookmarks'] # , 'Preferences']
supported_jsons = ['Bookmarks', 'TransportSecurity'] # , 'Preferences']
supported_items = supported_databases + supported_subdirs + supported_jsons
log.debug(f'Supported items: {supported_items}')

Expand Down Expand Up @@ -2215,6 +2337,13 @@ def process(self):
self.artifacts_display['Site Characteristics'],
self.artifacts_counts.get('Site Characteristics', '0')))

if 'TransportSecurity' in input_listing:
self.get_transport_security(self.profile_path, 'TransportSecurity')
self.artifacts_display['HSTS'] = "HSTS records"
print(self.format_processing_output(
self.artifacts_display['HSTS'],
self.artifacts_counts.get('HSTS', '0')))

if 'File System' in input_listing:
self.get_file_system(self.profile_path, 'File System')
self.artifacts_display['File System'] = 'File System Items'
Expand Down
Loading

0 comments on commit b01e980

Please sign in to comment.