Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Retrieve email headers #895

Closed
imad-testing opened this issue Nov 13, 2020 · 15 comments
Closed

[Bug] Retrieve email headers #895

imad-testing opened this issue Nov 13, 2020 · 15 comments

Comments

@imad-testing
Copy link

imad-testing commented Nov 13, 2020

Hello, I should build an analyser that retrieve the header of a specific mail on exchange and than I want to analyse the header using mxtoolbox. Can someone help me to do it? I'm stuck.

Thank you.

@imad-testing imad-testing added the category:bug Issue is related to a bug label Nov 13, 2020
@dadokkio dadokkio added help wanted and removed category:bug Issue is related to a bug labels Nov 13, 2020
@dadokkio
Copy link
Contributor

Ok, have you already written something? The problem is exchange connection or processing the email?

@imad-testing
Copy link
Author

Both. I'm not familiar with exchangelib in python but I found something maybe useful to retrieve the header of the last mail in my inbox.

import exchangelib as E

user = 'wgates@microsoft.com'
password = 'hunter2'

class RawHeaders(E.ExtendedProperty):
property_tag = 0x7d
property_type = 'String'
E.items.Message.register('rawheaders', RawHeaders)
E.items.MeetingRequest.register('rawheaders', RawHeaders)

print("Logging in…")
account = E.Account(
primary_smtp_address = user,
autodiscover = True,
credentials = E.Credentials(username = user, password = password),
access_type = E.DELEGATE)

print(account.inbox.all()[0].rawheaders)

@dadokkio
Copy link
Contributor

this is a function I wrote to connect to owa [similar to yours 😄 but probably your is missing some server info]

def authenticate(username, password, server, delegate_account):
    credentials = Credentials(username, password)
    config = Configuration(server=server, credentials=credentials)
    account = Account(
        delegate_account, config=config, autodiscover=False, access_type=DELEGATE
    )
    account.root.refresh()
    return account

Then you can use account.inbox.all()[0] to get the first email in your inbox or if you want sort them in reverse order or filter by unread
account.inbox.filter(is_read=not unread).order_by("-datetime_received")[0]

To get email info I suggest using specialized lib like eml_parser

ep = eml_parser.EmlParser()
ep.decode_email_bytes(account.inbox.all()[0])['header']

@imad-testing
Copy link
Author

imad-testing commented Nov 13, 2020

Thank you. but how should I retrieve the header of a specific mail?

@dadokkio
Copy link
Contributor

What do you mean with specific? you can search by message-id for example if you have it

email = account.inbox.get(message_id=email_id)

@imad-testing
Copy link
Author

Sorry if I was not clear. I mean instead of (account.inbox.all()[0])['header'] that is the first inbox, Can I search for a specific message in my inbox?

@dadokkio
Copy link
Contributor

you can search for whatever, see https://ecederstrand.github.io/exchangelib/#searching for example.

@imad-testing
Copy link
Author

Thank you. I just want to ask what is the parameter of the delegate server that I should put in the json file?

email.py

#!/usr/bin/env python3

encoding: utf-8

from cortexutils.analyzer import Analyzer
import exchangelib as E

class Email_Header_Mxtoolbox(Analyzer):
def init(self):
Analyzer.init(self)
self.username = self.get_param('config.username', None, 'Username needed for querying Exchange server')
self.password = self.get_param('config.password', None, 'Password needed for querying Exchange server')
self.server = self.get_param('config.server', None,'Server needed for querying Exchange server')
self.delegate_account = self.get_param('config.delegate_account', None, 'Delegate account needed for querying Exchange server')

def authenticate(self.username, self.password, self.server, self.delegate_account):
    credentials = Credentials(self.username, self.password)
    config = Configuration(server=self.server, credentials=credentials)
    account = Account( self.delegate_account, config=config, autodiscover=False, access_type=DELEGATE )
    account.root.refresh()
    return account

def run():
    account = authenticate(self.username, self.password, self.server, self.delegate_account)
    last_inbox_message= account.inbox.filter(is_read=not unread).order_by("-datetime_received")[0]
    ep = eml_parser.EmlParser()
    last_inbox_message_header = ep.decode_email_bytes(last_inbox_message)['header']
    print(last_inbox_message_header)

if name == 'main':
Email_Header_Mxtoolbox().run()

email.json

{
"name": "Active Directory Enable Disable",
"version": "1.0",
"author": "Imad Boustany",
"url": "https://github.com/TheHive-Project/Cortex-Analyzers",
"license": "AGPL-v3",
"description": "Analyse email Headers using mxtoolbox",
"baseConfig": "Email_Header_Mxtoolbox",
"dataTypeList": ["email"],
"command": "Email_Header_Mxtoolbox/Email_Header_Mxtoolbox.py",
"configurationItems": [
{
"name": "username",
"description": "MYWINDOMAIN//myusername",
"type": "string",
"multi": false,
"required": true
},
{
"name": "password",
"description": "topsecret",
"type": "string",
"multi": false,
"required": true,
},
{
"name": "server",
"description": "mail.example.com",
"type": "string",
"multi": false,
"required": true,
},
{
"name": "delegate_account",
"description": "mail.example.com",
"type": "string",
"multi": false,
"required": true,
}
]
}

@dadokkio
Copy link
Contributor

delegate_account? For example if you have a functional email you can login with your account and read mail as the functional one (eg. security_mbx@evilcorp.com 😄 ), if not you can put the same value you used as username.

@imad-testing
Copy link
Author

Okey deal. I will test it to verify the header so I can move to next step that is mxtoolbox.
Thank you!! You are really helping me for this project.

@imad-testing
Copy link
Author

imad-testing commented Nov 19, 2020

Hello @dadokkio. I tested my script. I was able to connect to the exchange server and retrieve my email header for a specific subject. But it returns a list thats why the analyser crash. I also tried the parser that you gave me but I could not be able to receive me header in a dictionary form. Can you help me doing this? I should receive a dictionary so when I put "self.report(dictionary) it will return a json and the analyser will be successfull"

Python script

#!/usr/bin/env python3
# encoding: utf-8
from cortexutils.analyzer import Analyzer
from exchangelib import Account, Credentials, Message, Mailbox, DELEGATE, Configuration,EWSTimeZone,EWSDateTime
import eml_parser
import time
import json
class Email_Header_Mxtoolbox(Analyzer):
    def __init__(self):
        Analyzer.__init__(self)
        self.username = self.get_param('config.username', None, 'Username needed for querying Exchange server')
        self.password = self.get_param('config.password', None, 'Password needed for querying Exchange server')
        self.server = self.get_param('config.server', None,'Server needed for querying Exchange server')


    def run(self):
        #Connect to the mail server by providing your credentials
        credentials = Credentials(self.username, self.password)
        config = Configuration(server=self.server, credentials=credentials)
        #Connect ot your exchange account
        account = Account(self.username, config=config, autodiscover=False, access_type=DELEGATE )
        account.root.refresh()
        #Filter the header of a specific message inbox by subject
        Subject = self.get_data()
        inbox_message_header = ""

        for item in account.inbox.all().order_by('-datetime_received')[:100]:
            if item.subject == Subject:
                print(item.headers)
                return 


if __name__ == '__main__':
    Email_Header_Mxtoolbox().run()

json file

{
    "name": "Email Header Mxtoolbox",
    "version": "1.0",
    "author": "Imad Boustany",
    "url": "https://github.com/TheHive-Project/Cortex-Analyzers",
    "license": "AGPL-v3",
    "description": "Analyse email Headers using mxtoolbox",
    "baseConfig": "Email_Header_Mxtoolbox",
    "dataTypeList": ["mail_subject"],
    "command": "Email_Header_Mxtoolbox/Email_Header_Mxtoolbox.py",
    "configurationItems": [
      {
        "name": "username",
        "description": "MYWINDOMAIN\/\/myusername1 or Firstname.Lastname@someenterprise.com",
        "type": "string",
        "multi": false,
        "required": true
      },
      {
        "name": "password",
        "description": "topsecret",
        "type": "string",
        "multi": false,
        "required": true
      },
      {
        "name": "server",
        "description": "shared_mail_box_name@someenterprise.com",
        "type": "string",
        "multi": false,
        "required": true
      },
      {
        "name": "Subject",
        "description": "Type the subject of your mail inbox to extract the Header from it",
        "type": "string",
        "multi": false,
        "required": true
      }

    ]
}

cortex result

image

outlook
image

@dadokkio
Copy link
Contributor

you can also create the dict when you invoke self.report and keep item.headers as a list.
Eg self.report({'headers': item.headers})
In any case since the function is headers it's possible that it will return a list that you can iterate.
This is related to the fact that reply and email attachment are considered part of the same mail so multiple headers are possible.

In any case I'm not really sure how you'll use this analyzer and if I have understood properly this is not the right approach.
Probably an external tool that pool emails and open case in thehive if needed is a better solution (synapse for example) or one of the others SOAR.

@imad-testing
Copy link
Author

**I tried self.report({'headers': item.headers}) but it always give me an error. Okey I will tell you how I should use it. First I want to get the email header, if I found an illigitimate source ip I will run e_discovery in all inboxes, and then search for forwarding rule and finally block the source email. But for now, I should get the email header for my project.
image
Do you have any idea?

@imad-testing
Copy link
Author

I solved the issue Thank you for your help. I will keep you posted when i configure another analyser.

@dadokkio
Copy link
Contributor

Great. I'll close the issue. If you need any other help just ask!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants