From 03fc3fe691a3ba825b0983bbd64197499e17cb24 Mon Sep 17 00:00:00 2001 From: kdoroszko-splunk Date: Wed, 7 Aug 2024 11:50:56 +0200 Subject: [PATCH] fix: add is_valid_hec function --- pytest_splunk_addon/splunk.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/pytest_splunk_addon/splunk.py b/pytest_splunk_addon/splunk.py index 610f3728f..1e77ad0ec 100644 --- a/pytest_splunk_addon/splunk.py +++ b/pytest_splunk_addon/splunk.py @@ -639,6 +639,7 @@ def splunk_external(request): "Could not connect to Splunk HEC" "Please check the log file for possible errors." ) + is_valid_hec(request, splunk_info) return splunk_info @@ -1016,6 +1017,36 @@ def is_responsive(url): return False +def is_valid_hec(request, splunk): + """ + Verify if provided hec token is valid by sending simple post request. + + Args: + splunk (dict): details of the Splunk instance + + Returns: + None + """ + + LOGGER.info( + "Validating HEC token... splunk=%s", + json.dumps(splunk), + ) + response = requests.post( + url=f'{request.config.getoption("splunk_hec_scheme")}://{splunk["forwarder_host"]}:{splunk["port_hec"]}/services/collector/raw', + headers={ + "Authorization": f'Splunk {request.config.getoption("splunk_hec_token")}' + }, + data={"event": "test_hec", "sourcetype": "hec_token_test"}, + verify=False, + ) + LOGGER.debug("Status code: {}".format(response.status_code)) + if response.status_code == 200: + LOGGER.info("Splunk HEC is valid.") + else: + pytest.exit("Exiting pytest due to invalid HEC token value.") + + def pytest_unconfigure(config): if PYTEST_XDIST_TESTRUNUID: if os.path.exists(PYTEST_XDIST_TESTRUNUID + "_wait"):