Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async support in Datadog connector #1492

Merged
merged 6 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion stix_shifter_modules/datadog/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
datadog_api_client==1.2.0
datadog_api_client[async]==2.12.0
urllib3==1.26.15
48 changes: 36 additions & 12 deletions stix_shifter_modules/datadog/stix_transmission/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,17 @@
from urllib3.exceptions import MaxRetryError
import urllib3

from stix_shifter_utils.utils import logger
from stix_shifter_utils.utils.error_response import ErrorResponder

import asyncio

class APIClient:

def __init__(self, connection, configuration):
self.logger = logger.set_logger(__name__)
self.connector = __name__.split('.')[1]

self.connection = connection
self.auth = configuration.get('auth')
self.configuration = datadog_api_client.v1.Configuration(host=connection["site_url"])
Expand All @@ -21,25 +29,41 @@ def __init__(self, connection, configuration):
self.configuration.verify_ssl = False
urllib3.disable_warnings()

def ping_data_source(self):
async def ping_data_source(self):
"""To Validate API key"""
# Enter a context with an instance of the API client
return_obj = {"code": 200}
with datadog_api_client.v1.ApiClient(self.configuration) as api_client:
# return_obj = {"code": 200}
return_obj = {}
response_dict = {}
async with datadog_api_client.v1.AsyncApiClient(self.configuration) as api_client:
# Create an instance of the API class
api_instance = events_api.EventsApi(api_client)
current_time = int(time.time())
try:
# There is no any specific Datadog endpoint which validate application key
api_instance.list_events(start=current_time, end=current_time)
await api_instance.list_events(start=current_time, end=current_time)
return_obj['success'] = True
except MaxRetryError as e:
e.status = 1004
return_obj.update({"code": e.status, "message": e.reason})
self.logger.error('error when pinging datasource {}:'.format(e.reason))
response_dict['type'] = 'MaxRetryError'
response_dict['message'] = 'Server error {}'.format(e.reason)
ErrorResponder.fill_error(return_obj, response_dict, ['message'], connector=self.connector)
except datadog_api_client.v1.ApiException as e:
return_obj.update({"code": e.status, "message": e.reason})
self.logger.error('error when pinging datasource: {}'.format(e.reason))
response_dict['code'] = e.status
response_dict['type'] = 'ServerError'
response_dict['message'] = 'Server error: {}'.format(e.reason)
ErrorResponder.fill_error(return_obj, response_dict, ['message'], connector=self.connector)
except Exception as ex:
self.logger.error('error when pinging datasource: {}'.format(ex))
response_dict['code'] = ex.errno
response_dict['type'] = 'ConnectionError'
response_dict['message'] = 'Server error: {}'.format(ex.strerror)
ErrorResponder.fill_error(return_obj, response_dict, ['message'], connector=self.connector)
return return_obj

def get_search_results(self, search_id, page=None):
async def get_search_results(self, search_id, page=None):
"""get the response from Datadog endpoints
:param search_id: dict, filter parameters
:param page: int,length value
Expand All @@ -49,16 +73,16 @@ def get_search_results(self, search_id, page=None):
search_id.update({"exclude_aggregate": True, "page": page})
if "source" in search_id:
search_id["sources"] = search_id.pop("source")
with datadog_api_client.v1.ApiClient(self.configuration) as api_client:
async with datadog_api_client.v1.AsyncApiClient(self.configuration) as api_client:
api_instance = events_api.EventsApi(api_client)
try:
api_response = api_instance.list_events(**search_id)
api_response = await api_instance.list_events(**search_id)
return_obj.update({"data": api_response})
except datadog_api_client.v1.ApiException as e:
return_obj.update({"code": e.status, "message": e.reason})
return return_obj

def get_processes_results(self):
async def get_processes_results(self):
return_obj = {"code": 200}
configuration = datadog_api_client.v2.Configuration(host=self.connection["site_url"])
configuration.api_key["apiKeyAuth"] = self.auth["api_key"]
Expand All @@ -67,10 +91,10 @@ def get_processes_results(self):
configuration.ssl_ca_cert = self.connection["selfSignedCert"]
else:
configuration.verify_ssl = False
with datadog_api_client.v2.ApiClient(configuration) as api_client:
async with datadog_api_client.v2.AsyncApiClient(configuration) as api_client:
api_instance = processes_api.ProcessesApi(api_client)
try:
api_response = api_instance.list_processes()
api_response = await api_instance.list_processes()
return_obj.update({"data": api_response})
except datadog_api_client.v2.ApiException as e:
return_obj.update({"code": e.status, "message": e.reason})
Expand Down
29 changes: 9 additions & 20 deletions stix_shifter_modules/datadog/stix_transmission/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,27 +12,16 @@ def __init__(self, connection, configuration):
self.connector = __name__.split('.')[1]

async def ping_connection(self):
try:
response = self.api_client.ping_data_source()
# Construct a response object
return_obj = dict()
if response["code"] == 200:
return_obj['success'] = True
else:
ErrorResponder.fill_error(return_obj, response, ['message'], connector=self.connector)
return return_obj
except Exception as err:
self.logger.error('error when pinging datasource {}:'.format(err))
raise
return await self.api_client.ping_data_source()

async def create_results_connection(self, query_expr, offset, length):
payload = json.loads(query_expr)
if payload['source'] == 'events':
return self.get_events(payload, offset, length)
return await self.get_events(payload, offset, length)
else:
return self.get_processes(payload, offset, length)
return await self.get_processes(payload, offset, length)

def get_events(self, query_expr, offset, length):
async def get_events(self, query_expr, offset, length):
length = int(length)
offset = int(offset)

Expand All @@ -42,15 +31,15 @@ def get_events(self, query_expr, offset, length):
# Separate out api supported url params
query_expr, filter_attr = Connector.modify_query_expr(query_expr['query'])
# Grab the response, extract the response code, and convert it to readable json
response_dict = self.api_client.get_search_results(query_expr)
response_dict = await self.api_client.get_search_results(query_expr)
event_list = []
return_obj = dict()
if response_dict["code"] == 200:
response = response_dict["data"]["events"]
response_list = response
page = 1
while len(response) == 1000 and total_records > len(response_list):
response = self.api_client.get_search_results(query_expr, page=page)
response = await self.api_client.get_search_results(query_expr, page=page)
response = response["data"]["events"]
response_list = response_list + response
page = page + 1
Expand All @@ -73,7 +62,7 @@ def get_events(self, query_expr, offset, length):
self.logger.error(traceback.print_stack())
raise

def get_processes(self, query_expr, offset, length):
async def get_processes(self, query_expr, offset, length):
length = int(length)
offset = int(offset)

Expand All @@ -83,15 +72,15 @@ def get_processes(self, query_expr, offset, length):
# Separate out api supported url params
query_expr, filter_attr = Connector.modify_query_expr(query_expr['query'])
# Grab the response, extract the response code, and convert it to readable json
response_dict = self.api_client.get_processes_results()
response_dict = await self.api_client.get_processes_results()
process_list = []
return_obj = dict()
if response_dict["code"] == 200:
response = response_dict["data"]["data"]
response_list = response
page = 1
while len(response) == 1000 and total_records > len(response_list):
response = self.api_client.get_processes_results()
response = await self.api_client.get_processes_results()
response = response["data"]["data"]
response_list = response_list + response
page = page + 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
403: ErrorCode.TRANSMISSION_FORBIDDEN,
# A request parameter is not valid
400: ErrorCode.TRANSMISSION_INVALID_PARAMETER,
1004: ErrorCode.TRANSMISSION_AUTH_SSL
1004: ErrorCode.TRANSMISSION_AUTH_SSL,
500: ErrorCode.TRANSMISSION_CONNECT,
8: ErrorCode.TRANSMISSION_REMOTE_SYSTEM_IS_UNAVAILABLE
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@ def test_is_async(self):

@patch('stix_shifter_modules.datadog.stix_transmission.api_client.APIClient.ping_data_source')
def test_ping(self, mock_generate_token):
mocked_return_value = {"code": 200}
mocked_return_value = {"success": True}
mock_generate_token.return_value = mocked_return_value
entry_point = EntryPoint(self.connection(), self.configuration())
ping_result = run_in_thread(entry_point.ping_connection)
assert ping_result["success"] is True

@patch('stix_shifter_modules.datadog.stix_transmission.api_client.APIClient.ping_data_source')
def test_ping_endpoint_exception(self, mock_generate_token):
mocked_return_value = {"code": 403, "message": "forbidden"}
mocked_return_value = {"success": False,"code": "forbidden", "error": "datadog connector error => Server error: Forbidden"}
mock_generate_token.return_value = mocked_return_value

entry_point = EntryPoint(self.connection(), self.configuration())
ping_response = run_in_thread(entry_point.ping_connection)

assert ping_response['success'] is False
assert ping_response['error'] == "datadog connector error => forbidden"
assert ping_response['error'] == "datadog connector error => Server error: Forbidden"
assert ping_response['code'] == ErrorCode.TRANSMISSION_FORBIDDEN.value

@patch('stix_shifter_modules.datadog.stix_transmission.api_client.APIClient.ping_data_source')
Expand Down