-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
AWSSecurityHubEventCollector.py
309 lines (239 loc) · 11.1 KB
/
AWSSecurityHubEventCollector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
import demistomock as demisto # noqa: F401
from CommonServerPython import * # noqa: F401
import datetime as dt
from typing import TYPE_CHECKING, Iterator, cast
from AWSApiModule import *
# The following import are used only for type hints and autocomplete.
# It is not used at runtime, and not exist in the docker image.
if TYPE_CHECKING:
from mypy_boto3_securityhub import SecurityHubClient
from mypy_boto3_securityhub.type_defs import AwsSecurityFindingTypeDef
VENDOR = 'AWS'
PRODUCT = 'Security Hub'
TIME_FIELD = 'CreatedAt'
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
DEFAULT_FIRST_FETCH = '3 days'
DEFAULT_MAX_RESULTS = 1000
API_MAX_PAGE_SIZE = 100 # The API only allows a maximum of 100 results per request. Using more raises an error.
def generate_last_run(events: list["AwsSecurityFindingTypeDef"]) -> dict[str, Any]:
"""
Generate the last run object using events data.
Args:
events (list[dict]): List of events to generate the last run object from.
Note:
Since the time filters seem to be equal or greater than (which results in duplicate from the last run),
we add findings that are equal to 'last_finding_update_time' and filter them out in the next fetch.
Returns:
dict: Last run object.
"""
ignore_list: list[str] = []
last_update_date = events[-1].get(TIME_FIELD)
# Since the "_time" key is added to each event, the event type changes from "AwsSecurityFindingTypeDef" to just dict
events = cast(list[dict[str, Any]], events)
for event in events:
event['_time'] = event[TIME_FIELD]
if event[TIME_FIELD] == last_update_date:
ignore_list.append(event['Id'])
return {
'last_update_date': last_update_date,
'last_update_date_finding_ids': ignore_list,
}
def get_events(client: "SecurityHubClient", start_time: dt.datetime | None = None,
end_time: dt.datetime | None = None, id_ignore_list: list[str] | None = None,
page_size: int = API_MAX_PAGE_SIZE, limit: int = 0) -> Iterator[List["AwsSecurityFindingTypeDef"]]:
"""
Fetch events from AWS Security Hub.
Args:
client (SecurityHubClient): Boto3 client to use.
start_time (datetime | None, optional): Start time to fetch events from. Required if end_time is set.
end_time (datetime | None, optional): Time to fetch events until. Defaults to current time.
id_ignore_list (list[str] | None, optional): List of finding IDs to not include in the results.
Defaults to None.
page_size (int, optional): Number of results to fetch per request. Defaults to API_MAX_PAGE_SIZE.
limit (int, optional): Maximum number of results to fetch. Defaults to 0.
Yields:
tuple[list, CommandResults]: A tuple containing the events and the CommandResults object.
"""
kwargs: dict = {'SortCriteria': [{'Field': TIME_FIELD, 'SortOrder': 'asc'}]}
filters: dict = {}
if end_time and not start_time:
raise ValueError('start_time must be set if end_time is used.')
if start_time:
filters[TIME_FIELD] = [{
'Start':
start_time.strftime(DATETIME_FORMAT),
'End':
end_time.strftime(DATETIME_FORMAT) if end_time else
dt.datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
}]
if id_ignore_list:
ignore_filters = [{'Value': event_id, 'Comparison': 'NOT_EQUALS'} for event_id in id_ignore_list]
filters['Id'] = ignore_filters
if filters:
# We send kwargs because passing Filters=None to get_findings() tries to use a None value for filters,
# which raises an error.
kwargs['Filters'] = filters
count = 0
while True:
if limit and limit - count < page_size:
kwargs['MaxResults'] = limit - count
else:
kwargs['MaxResults'] = page_size
response = client.get_findings(**kwargs)
result = response.get('Findings', [])
count += len(result)
yield result
if 'NextToken' in response and (limit == 0 or count < limit):
kwargs['NextToken'] = response['NextToken']
else:
break
def fetch_events(client: "SecurityHubClient", last_run: dict, first_fetch_time: dt.datetime | None,
page_size: int = API_MAX_PAGE_SIZE, limit: int = 0
) -> tuple[list["AwsSecurityFindingTypeDef"], dict, Exception | None]:
"""
Fetch events from AWS Security Hub and send them to XSIAM.
Args:
client (SecurityHubClient): Boto3 client to use.
last_run (dict): Dict containing the last fetched event creation time.
first_fetch_time (datetime | None, optional): In case of first fetch, fetch events from this datetime.
page_size (int, optional): Number of results to fetch per request. Defaults to API_MAX_PAGE_SIZE.
limit (int, optional): Maximum number of events to fetch. Defaults to 0 (no limit).
"""
if last_run.get('last_update_date'):
start_time = parse_date_string(last_run['last_update_date'])
else:
start_time = first_fetch_time
id_ignore_list: list = last_run.get('last_update_date_finding_ids', [])
events: list["AwsSecurityFindingTypeDef"] = []
error = None
try:
for events_batch in get_events(client=client, start_time=start_time, id_ignore_list=id_ignore_list,
page_size=page_size, limit=limit):
events.extend(events_batch)
except Exception as e:
demisto.error(f'Error while fetching events.'
f'Events fetched so far: {len(events)}'
f'Error: {e}')
error = e
# --- Set next_run data ---
if events:
demisto.info(f'Fetched {len(events)} findings.')
next_run = generate_last_run(events)
demisto.info(f'Last run data updated to: {next_run}.')
else:
demisto.info('No new findings were found.')
next_run = last_run
return events, next_run, error
def get_events_command(client: "SecurityHubClient", should_push_events: bool,
page_size: int, limit: int = 0) -> CommandResults:
"""
Fetch events from AWS Security Hub.
Args:
client (SecurityHubClient): Boto3 client to use.
should_push_events (bool): Whether to push events to XSIAM.
page_size (int, optional): Number of results to fetch per request. Defaults to API_MAX_PAGE_SIZE.
limit (int, optional): Maximum number of events to fetch. Defaults to 0 (no limit).
Returns:
CommandResults: CommandResults object containing the events.
"""
events = []
for events_batch in get_events(client=client, page_size=page_size, limit=limit):
events.extend(events_batch)
if should_push_events:
send_events_to_xsiam(
events,
vendor=VENDOR,
product=PRODUCT
)
return CommandResults(
readable_output=tableToMarkdown('AWS Security Hub Events', events, sort_headers=False),
)
def main(): # pragma: no cover
params = demisto.params()
args = demisto.args()
command = demisto.command()
aws_role_arn = params.get('role_arn')
aws_role_session_name = params.get('role_session_name')
aws_default_region = params.get('default_region')
aws_role_session_duration = params.get('role_session_duration')
aws_access_key_id = demisto.get(params, 'credentials.identifier')
aws_secret_access_key = demisto.get(params, 'credentials.password')
verify_certificate = not params.get('insecure', True)
timeout = params.get('timeout')
retries = params.get('retries', 5)
limit = arg_to_number(params.get('max_fetch')) or DEFAULT_MAX_RESULTS
if limit <= 0:
raise ValueError("Max fetch value cannot be lower than 1.")
# How much time before the first fetch to retrieve events
first_fetch_time = arg_to_datetime(
arg=params.get('first_fetch', DEFAULT_FIRST_FETCH),
arg_name='First fetch time',
required=True
)
try:
validate_params(
aws_default_region=aws_default_region,
aws_role_arn=aws_role_arn,
aws_role_session_name=aws_role_session_name,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
aws_client = AWSClient(
aws_default_region=aws_default_region,
aws_role_arn=aws_role_arn,
aws_role_session_name=aws_role_session_name,
aws_role_session_duration=aws_role_session_duration,
aws_role_policy=None,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
verify_certificate=verify_certificate,
timeout=timeout,
retries=retries,
)
client: "SecurityHubClient" = aws_client.aws_session(
service='securityhub',
region=aws_default_region,
role_arn=aws_role_arn,
role_session_name=aws_role_session_name,
role_session_duration=aws_role_session_duration,
)
demisto.info(f'Executing \"{command}\" command...')
if command == 'test-module':
next(get_events(client=client, limit=1))
return_results('ok')
elif command == 'aws-securityhub-get-events':
should_push_events = argToBoolean(args.get('should_push_events', False))
page_size = arg_to_number(args.get('page_size', API_MAX_PAGE_SIZE))
if page_size is None or page_size > 100:
raise ValueError('Page size cannot be larger than 100 (not supported by the API).')
limit = arg_to_number(args.get('limit')) or 1
if limit is None or limit <= 0:
raise ValueError("Max fetch value cannot be lower than 1.")
return_results(
get_events_command(client=client,
should_push_events=should_push_events,
page_size=page_size,
limit=limit))
elif command == 'fetch-events':
events, next_run, error = fetch_events(
client=client,
last_run=demisto.getLastRun(),
first_fetch_time=first_fetch_time,
limit=limit,
)
send_events_to_xsiam(events=events, vendor=VENDOR, product=PRODUCT)
demisto.setLastRun(next_run)
if error and events:
raise Exception(f'An error occurred while running fetch-events. '
f'The operation was partially successful, but failed midway.\n'
f'A total of {len(events)} events were successfully fetched '
f'before the error occurred.') from error
elif error:
raise error
else:
raise NotImplementedError(f'Command \"{command}\" is not implemented.')
# Log exceptions and return errors
except Exception as e:
return_error(f'Failed to execute {command} command.\nError:\n{e}')
if __name__ in ('__main__', '__builtin__', 'builtins'):
main()