Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add Splunk HEC token validation for external Splunk #877

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions pytest_splunk_addon/splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ def splunk_external(request):
"Could not connect to Splunk HEC"
"Please check the log file for possible errors."
)
is_valid_hec(request, splunk_info)
return splunk_info


Expand Down Expand Up @@ -1016,6 +1017,36 @@ def is_responsive(url):
return False


def is_valid_hec(request, splunk):
"""
Verify if provided hec token is valid by sending simple post request.

Args:
splunk (dict): details of the Splunk instance

Returns:
None
"""

LOGGER.info(
"Validating HEC token... splunk=%s",
json.dumps(splunk),
)
response = requests.post(
url=f'{request.config.getoption("splunk_hec_scheme")}://{splunk["forwarder_host"]}:{splunk["port_hec"]}/services/collector/raw',
headers={
"Authorization": f'Splunk {request.config.getoption("splunk_hec_token")}'
},
data={"event": "test_hec", "sourcetype": "hec_token_test"},
verify=False,
)
LOGGER.debug("Status code: {}".format(response.status_code))
if response.status_code == 200:
LOGGER.info("Splunk HEC is valid.")
else:
pytest.exit("Exiting pytest due to invalid HEC token value.")


def pytest_unconfigure(config):
if PYTEST_XDIST_TESTRUNUID:
if os.path.exists(PYTEST_XDIST_TESTRUNUID + "_wait"):
Expand Down
Loading