-
Notifications
You must be signed in to change notification settings - Fork 7
/
soltra.py
151 lines (141 loc) · 7.03 KB
/
soltra.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from common_methods import *
import libtaxii as t
import libtaxii.clients as tc
import libtaxii.messages_11 as tm11
from libtaxii.constants import *
from stix.core import STIXPackage
import requests
import json
import uuid
def poll_feed(settings,subscription):
""" polls a TAXII feed"""
client = tc.HttpClient()
client.set_auth_type(tc.HttpClient.AUTH_BASIC)
client.set_use_https(True)
client.set_auth_credentials({'username': settings['username'], 'password': settings['password']})
msg_id=uuid.uuid4().hex
poll_request1 = tm11.PollRequest(message_id=msg_id,collection_name=settings['subscriptions'][subscription]['collection_name'],subscription_id=settings['subscriptions'][subscription]['subscription_id'])
poll_xml=poll_request1.to_xml()
http_resp = client.call_taxii_service2(settings['server'], '/taxii-data/', VID_TAXII_XML_11, poll_xml)
taxii_message = t.get_message_from_http_response(http_resp, poll_request1.message_id)
observables={}
indicators = json.loads(taxii_message.to_json())
if 'content_blocks' in indicators.keys():
for indicator in indicators['content_blocks']:
open('/tmp/indicator.xml','w').write(indicator['content'])
indi=STIXPackage.from_xml('/tmp/indicator.xml').to_dict()
if 'observables' in indi.keys():
for obs in indi['observables']['observables']:
if 'object' in obs.keys():
ot=obs['object']['properties']['xsi:type']
if ot in settings['supported_objects'].keys() and not ot in observables.keys():
observables[ot]=[]
if ot in settings['supported_objects'].keys() and settings['supported_objects'][ot] in obs['object']['properties'].keys():
# note, you will only be able to process one property per object type, but you also know there's only one property you can process
try:
observables[ot].append(obs['object']['properties'][settings['supported_objects'][ot]])
except:
print "[-] you're dumb"
print supported_objects[ot], "not in:", obs['object']
return observables
def list_indicators(settings):
"""exports a list of the indocators in CRITs"""
params={'username':settings['username'],'api_key':settings['api_key'],'limit':settings['offset'],'offset':0}
url=settings['url']+'indicators/'
total=settings['offset']
indicators=[]
while params['offset'] <= total:
#print total,params['offset']
r = requests.get(url, params=params, verify=False)
if r.status_code == 200:
res=r.json()
for potential_result in res['objects']:
# only getting indicators meaning something
if potential_result['campaign'] and get_intel_confidence(potential_result) in ['medium','high']:
indicators.append(potential_result)
params['offset']+=settings['offset']
total=res['meta']['total_count']
return indicators
def list_ips(settings, limit=0):
"""exports a list of the IPs in CRITs, basing the confidence on the campaign confidence"""
ips=[]
params={'username':settings['username'],'api_key':settings['api_key'],'limit':settings['offset'],'offset':0}
url=settings['url']+'ips/'
#total=settings['offset']
total=limit
while params['offset'] <= total:
#print total,params['offset']
r = requests.get(url, params=params, verify=False)
if r.status_code == 200:
res=r.json()
for potential_result in res['objects']:
# only getting indicators meaning something - don't care about low and unknowns
if potential_result['campaign'] and get_intel_confidence(potential_result) in ['medium','high']:
#print potential_result
ips.append(potential_result)
params['offset']+=settings['offset']
if not limit:
total=res['meta']['total_count']
return ips
def list_fqdns(settings,limit=0):
"""exports a list of the FQDNs in CRITs, basing the confidence on the campaign confidence"""
fqdns=[]
params={'username':settings['username'],'api_key':settings['api_key'],'limit':settings['offset'],'offset':0}
url=settings['url']+'domains/'
if limit:
total=limit
else:
total=settings['offset']
while params['offset'] <= total:
#print total,params['offset']
r = requests.get(url, params=params, verify=False)
if r.status_code == 200:
res=r.json()
for potential_result in res['objects']:
# only getting indicators meaning something - don't care about lows and unknowns
if potential_result['campaign'] and get_intel_confidence(potential_result) in ['medium','high']:
#print potential_result
fqdns.append(potential_result)
params['offset']+=settings['offset']
if not limit:
total=res['meta']['total_count']
return fqdns
def list_samples(settings,limit=0):
"""exports a list of the samples in CRITs, basing the confidence on the campaign confidence"""
samples=[]
params={'username':settings['username'],'api_key':settings['api_key'],'limit':settings['offset'],'offset':0}
url=settings['url']+'samples/'
if limit:
total=limit
else:
total=settings['offset']
while params['offset'] <= total:
#print total,params['offset']
r = requests.get(url, params=params, verify=False)
if r.status_code == 200:
res=r.json()
for potential_result in res['objects']:
# only getting indicators meaning something - don't care about lows and unknowns
if potential_result['campaign'] and get_intel_confidence(potential_result) in ['medium','high'] and (potential_result['md5'] or potential_results['fiename']):
#print potential_result
samples.append(potential_result)
params['offset']+=settings['offset']
if not limit:
total=res['meta']['total_count']
return samples
def list_targets(settings):
"""exports a list of the targets in CRITs"""
targets=[]
params={'username':settings['username'],'api_key':settings['api_key'],'limit':settings['offset'],'offset':0}
url=settings['url']+'targets/'
total=settings['offset']
while params['offset'] <= total:
#print total,params['offset']
r = requests.get(url, params=params, verify=False)
if r.status_code == 200:
res=r.json()
for potential_result in res['objects']:
targets.append(potential_result)
params['offset']+=settings['offset']
total=res['meta']['total_count']
return targets