forked from MISP/misp-modules
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathemail_import.py
402 lines (340 loc) · 15.9 KB
/
email_import.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
import base64
import io
import zipfile
import codecs
import re
from email import message_from_bytes
from email.utils import parseaddr
from email.iterators import typed_subpart_iterator
from email.parser import Parser
from html.parser import HTMLParser
from email.header import decode_header
misperrors = {'error': 'Error'}
userConfig = {}
inputSource = ['file']
moduleinfo = {'version': '0.1',
'author': 'Seamus Tuohy',
'description': 'Email import module for MISP',
'module-type': ['import']}
# unzip_attachments : Unzip all zip files that are not password protected
# guess_zip_attachment_passwords : This attempts to unzip all password protected zip files using all the strings found in the email body and subject
# extract_urls : This attempts to extract all URL's from text/html parts of the email
moduleconfig = ["unzip_attachments",
"guess_zip_attachment_passwords",
"extract_urls"]
def handler(q=False):
if q is False:
return False
results = []
# Decode and parse email
request = json.loads(q)
# request data is always base 64 byte encoded
data = base64.b64decode(request["data"])
# Double decode to force headers to be re-parsed with proper encoding
message = Parser().parsestr(message_from_bytes(data).as_string())
# Decode any encoded headers to get at proper string
for key, val in message.items():
replacement = get_decoded_header(key, val)
if replacement is not None:
message.replace_header(key, replacement)
# Extract all header information
all_headers = ""
for k, v in message.items():
all_headers += "{0}: {1}\n".format(k.strip(), v.strip())
results.append({"values": all_headers, "type": 'email-header'})
# E-Mail MIME Boundry
if message.get_boundary():
results.append({"values": message.get_boundary(), "type": 'email-mime-boundary'})
# E-Mail Reply To
if message.get('In-Reply-To'):
results.append({"values": message.get('In-Reply-To').strip(), "type": 'email-reply-to'})
# X-Mailer
if message.get('X-Mailer'):
results.append({"values": message.get('X-Mailer'), "type": 'email-x-mailer'})
# Thread Index
if message.get('Thread-Index'):
results.append({"values": message.get('Thread-Index'), "type": 'email-thread-index'})
# Email Message ID
if message.get('Message-ID'):
results.append({"values": message.get('Message-ID'), "type": 'email-message-id'})
# Subject
if message.get('Subject'):
results.append({"values": message.get('Subject'), "type": 'email-subject'})
# Source
from_addr = message.get('From')
if from_addr:
results.append({"values": parseaddr(from_addr)[1], "type": 'email-src', "comment": "From: {0}".format(from_addr)})
results.append({"values": parseaddr(from_addr)[0], "type": 'email-src-display-name', "comment": "From: {0}".format(from_addr)})
# Return Path
return_path = message.get('Return-Path')
if return_path:
# E-Mail Source
results.append({"values": parseaddr(return_path)[1], "type": 'email-src', "comment": "Return Path: {0}".format(return_path)})
# E-Mail Source Name
results.append({"values": parseaddr(return_path)[0], "type": 'email-src-display-name', "comment": "Return Path: {0}".format(return_path)})
# Destinations
# Split and sort destination header values
recipient_headers = ['To', 'Cc', 'Bcc']
for hdr_val in recipient_headers:
if message.get(hdr_val):
addrs = message.get(hdr_val).split(',')
for addr in addrs:
# Parse and add destination header values
parsed_addr = parseaddr(addr)
results.append({"values": parsed_addr[1], "type": "email-dst", "comment": "{0}: {1}".format(hdr_val, addr)})
results.append({"values": parsed_addr[0], "type": "email-dst-display-name", "comment": "{0}: {1}".format(hdr_val, addr)})
# Get E-Mail Targets
# Get the addresses that received the email.
# As pulled from the Received header
received = message.get_all('Received')
if received:
email_targets = set()
for rec in received:
try:
email_check = re.search("for\s(.*@.*);", rec).group(1)
email_check = email_check.strip(' <>')
email_targets.add(parseaddr(email_check)[1])
except (AttributeError):
continue
for tar in email_targets:
results.append({"values": tar, "type": "target-email", "comment": "Extracted from email 'Received' header"})
# Check if we were given a configuration
config = request.get("config", {})
# Don't be picky about how the user chooses to say yes to these
acceptable_config_yes = ['y', 'yes', 'true', 't']
# Do we unzip attachments we find?
unzip = config.get("unzip_attachments", None)
if (unzip is not None and unzip.lower() in acceptable_config_yes):
unzip = True
# Do we try to find passwords for protected zip files?
zip_pass_crack = config.get("guess_zip_attachment_passwords", None)
if (zip_pass_crack is not None and zip_pass_crack.lower() in acceptable_config_yes):
zip_pass_crack = True
password_list = None # Only want to collect password list once
# Do we extract URL's from the email.
extract_urls = config.get("extract_urls", None)
if (extract_urls is not None and extract_urls.lower() in acceptable_config_yes):
extract_urls = True
# Get Attachments
# Get file names of attachments
for part in message.walk():
filename = part.get_filename()
if filename is not None:
results.append({"values": filename, "type": 'email-attachment'})
attachment_data = part.get_payload(decode=True)
# Base attachment data is default
attachment_files = [{"values": filename, "data": base64.b64encode(attachment_data).decode()}]
if unzip is True: # Attempt to unzip the attachment and return its files
zipped_files = ["doc", "docx", "dot", "dotx", "xls",
"xlsx", "xlm", "xla", "xlc", "xlt",
"xltx", "xlw", "ppt", "pptx", "pps",
"ppsx", "pot", "potx", "potx", "sldx",
"odt", "ods", "odp", "odg", "odf",
"fodt", "fods", "fodp", "fodg", "ott",
"uot"]
zipped_filetype = False
for ext in zipped_files:
if filename.endswith(ext) is True:
zipped_filetype = True
if zipped_filetype == False:
try:
attachment_files += get_zipped_contents(filename, attachment_data)
except RuntimeError: # File is encrypted with a password
if zip_pass_crack is True:
if password_list is None:
password_list = get_zip_passwords(message)
password = test_zip_passwords(attachment_data, password_list)
if password is None: # Inform the analyst that we could not crack password
attachment_files[0]['comment'] = "Encrypted Zip: Password could not be cracked from message"
else:
attachment_files[0]['comment'] = """Original Zipped Attachment with Password {0}""".format(password)
attachment_files += get_zipped_contents(filename, attachment_data, password=password)
except zipfile.BadZipFile: # Attachment is not a zipfile
pass
for attch_item in attachment_files:
attch_item["type"] = 'malware-sample'
results.append(attch_item)
else: # Check email body part for urls
if (extract_urls is True and part.get_content_type() == 'text/html'):
url_parser = HTMLURLParser()
charset = get_charset(part, get_charset(message))
url_parser.feed(part.get_payload(decode=True).decode(charset))
urls = url_parser.urls
for url in urls:
results.append({"values": url, "type": "url"})
r = {'results': results}
return r
def get_zipped_contents(filename, data, password=None):
"""Extract the contents of a zipfile.
Args:
filename (str): A string containing the name of the zip file.
data (decoded attachment data): Data object decoded from an e-mail part.
Returns:
Returns an array containing a dict for each file
Example Dict {"values":"name_of_file.txt",
"data":<Base64 Encoded BytesIO>,
"comment":"string here"}
"""
with zipfile.ZipFile(io.BytesIO(data), "r") as zf:
unzipped_files = []
if password is not None:
password = str.encode(password) # Byte encoded password required
for zip_file_name in zf.namelist(): # Get all files in the zip file
with zf.open(zip_file_name, mode='r', pwd=password) as fp:
file_data = fp.read()
unzipped_files.append({"values": zip_file_name,
"data": base64.b64encode(file_data).decode(), # Any password works when not encrypted
"comment": "Extracted from {0}".format(filename)})
return unzipped_files
def test_zip_passwords(data, test_passwords):
"""Test passwords until one is found to be correct.
Args:
data (decoded attachment data): Data object decoded from an e-mail part.
test_passwords (array): List of strings to test as passwords
Returns:
Returns a byte string containing a found password and None if password is not found.
"""
with zipfile.ZipFile(io.BytesIO(data), "r") as zf:
firstfile = zf.namelist()[0]
for pw_test in test_passwords:
byte_pwd = str.encode(pw_test)
try:
zf.open(firstfile, pwd=byte_pwd)
return pw_test
except RuntimeError: # Incorrect Password
continue
return None
def get_zip_passwords(message):
""" Parse message for possible zip password combinations.
Args:
message (email.message) Email message object to parse.
"""
possible_passwords = []
# Passwords commonly used for malware
malware_passwords = ["infected", "malware"]
possible_passwords += malware_passwords
# Commonly used passwords
common_passwords = ["123456", "password", "12345678", "qwerty",
"abc123", "123456789", "111111", "1234567",
"iloveyou", "adobe123", "123123", "sunshine",
"1234567890", "letmein", "1234", "monkey",
"shadow", "sunshine", "12345", "password1",
"princess", "azerty", "trustno1", "000000"]
possible_passwords += common_passwords
# Not checking for multi-part message because by having an
# encrypted zip file it must be multi-part.
text_parts = [part for part in typed_subpart_iterator(message, 'text', 'plain')]
html_parts = [part for part in typed_subpart_iterator(message, 'text', 'html')]
body = []
# Get full message character set once
# Language example reference (using python2)
# http://ginstrom.com/scribbles/2007/11/19/parsing-multilingual-email-with-python/
message_charset = get_charset(message)
for part in text_parts:
charset = get_charset(part, message_charset)
body.append(part.get_payload(decode=True).decode(charset))
for part in html_parts:
charset = get_charset(part, message_charset)
html_part = part.get_payload(decode=True).decode(charset)
html_parser = HTMLTextParser()
html_parser.feed(html_part)
for text in html_parser.text_data:
body.append(text)
raw_text = "\n".join(body).strip()
# Add subject to text corpus to parse
subject = " " + message.get('Subject')
raw_text += subject
# Grab any strings that are marked off by special chars
marking_chars = [["\'", "\'"], ['"', '"'], ['[', ']'], ['(', ')']]
for char_set in marking_chars:
regex = re.compile("""\{0}([^\{1}]*)\{1}""".format(char_set[0], char_set[1]))
marked_off = re.findall(regex, raw_text)
possible_passwords += marked_off
# Create a list of unique words to test as passwords
individual_words = re.split(r"\s", raw_text)
# Also get words with basic punctuation stripped out
# just in case someone places a password in a proper sentence
stripped_words = [i.strip('.,;:?!') for i in individual_words]
unique_words = list(set(individual_words + stripped_words))
possible_passwords += unique_words
return possible_passwords
class HTMLTextParser(HTMLParser):
""" Parse all text and data from HTML strings."""
def __init__(self, text_data=None):
HTMLParser.__init__(self)
if text_data is None:
self.text_data = []
else:
self.text_data = text_data
def handle_data(self, data):
self.text_data.append(data)
class HTMLURLParser(HTMLParser):
""" Parse all href targets from HTML strings."""
def __init__(self, urls=None):
HTMLParser.__init__(self)
if urls is None:
self.urls = []
else:
self.urls = urls
def handle_starttag(self, tag, attrs):
if tag == 'a':
self.urls.append(dict(attrs).get('href'))
def get_charset(message, default="ascii"):
"""Get a message objects charset
Args:
message (email.message): Email message object to parse.
default (string): String containing default charset to return.
"""
if message.get_content_charset():
return message.get_content_charset()
if message.get_charset():
return message.get_charset()
return default
def get_decoded_header(header, value):
subject, encoding = decode_header(value)[0]
subject = subject.strip() # extra whitespace will mess up encoding
if isinstance(subject, bytes):
# Remove Byte Order Mark (BOM) from UTF strings
if encoding == 'utf-8':
return re.sub(codecs.BOM_UTF8, b"", subject).decode(encoding)
if encoding == 'utf-16':
return re.sub(codecs.BOM_UTF16, b"", subject).decode(encoding)
elif encoding == 'utf-32':
return re.sub(codecs.BOM_UTF32, b"", subject).decode(encoding)
# Try various UTF decodings for any unknown 8bit encodings
elif encoding == 'unknown-8bit':
for enc in [('utf-8', codecs.BOM_UTF8),
('utf-32', codecs.BOM_UTF32), # 32 before 16 so it raises errors
('utf-16', codecs.BOM_UTF16)]:
try:
return re.sub(enc[1], b"", subject).decode(enc[0])
except UnicodeDecodeError:
continue
# If none of those encoding work return it in RFC2047 format
return str(subject)
# Provide RFC2047 format string if encoding is a unknown encoding
# Better to have the analyst decode themselves than to provide a mangled string
elif encoding is None:
return str(subject)
else:
return subject.decode(encoding)
def introspection():
modulesetup = {}
try:
modulesetup['userConfig'] = userConfig
except NameError:
pass
try:
modulesetup['inputSource'] = inputSource
except NameError:
pass
return modulesetup
def version():
moduleinfo['config'] = moduleconfig
return moduleinfo
if __name__ == '__main__':
with open('tests/test_no_attach.eml', 'r') as email_file:
handler(q=email_file.read())