Skip to content
This repository has been archived by the owner on Jul 16, 2024. It is now read-only.

Commit

Permalink
Merge pull request #12 from target/splunkresultsapi
Browse files Browse the repository at this point in the history
Multiprocessing support for SplunkDF
  • Loading branch information
DavidJBianco authored Jun 19, 2020
2 parents 3974c82 + 21d1ae2 commit d3a029c
Show file tree
Hide file tree
Showing 11 changed files with 3,000,426 additions and 107 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,27 @@ df = s.search_df(
```
In the event you need more control over which "internal" fields to drop, you can pass a comma-separated list of field names (NOTE: these can be any field, not just Splunk internal fields).

Splunk's Python API can be quite slow, so to speed things up you may elect to spread the result retrieval among multiple cores. The default is to use one (1) extra core, but you can use the `processes` argument to `search()` or `search_df()` to set this higher if you like.

```python
df = s.search_df(
spl="search index=win_events EventCode=4688",
processes=4
)
```

If you prefer to use all your cores, try something like:

```python
from multiprocessing import cpu_count

df = s.search_df(
spl="search index=win_events EventCode=4688",
processes=cpu_count()
)
```

*NOTE: You may have to experiment to find the optimal number of parallel processes for your specific environment. Maxing out the number of workers doesn't always give the best performance.*

## Data Module

Expand Down
12 changes: 12 additions & 0 deletions huntlib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import math
from jellyfish import levenshtein_distance, damerau_levenshtein_distance, hamming_distance, jaro_similarity, jaro_winkler_similarity
import sys
import platform
import multiprocessing

__all__ = ['entropy', 'entropy_per_byte', 'promptCreds', 'edit_distance']

Expand Down Expand Up @@ -92,3 +94,13 @@ def edit_distance(str1, str2, method="damerau-levenshtein"):
str2 = unicode(str2)

return distance_function(str1, str2)


# First time initialization on import

# Set Mac OS systems to use the older "fork" method of spawning
# procs for the multiprocessing module. For some reason the newer
# methods don't work (EOFErrors when creating Manager() objects)
system_type = platform.system()
if system_type == "Darwin":
multiprocessing.set_start_method('fork')
2 changes: 1 addition & 1 deletion huntlib/elastic.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from huntlib.exceptions import *
from huntlib.exceptions import AuthenticationErrorSearchException, InvalidRequestSearchException, UnknownSearchException
from builtins import object
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
Expand Down
276 changes: 187 additions & 89 deletions huntlib/splunk.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
from __future__ import print_function

from huntlib.exceptions import *
import multiprocessing
import multiprocessing.managers
import platform
import time
from builtins import object
from datetime import datetime
from multiprocessing import (Manager, Process, Queue, cpu_count,
set_start_method)
from sys import stderr

import pandas as pd
import splunklib.client as client
import splunklib.results as results
import splunklib.binding
import pandas as pd
from datetime import datetime

from huntlib.exceptions import AuthenticationErrorSearchException


class SplunkDF(object):
'''
Expand Down Expand Up @@ -63,9 +73,79 @@ def __init__(self, host=None, username=None, password=None, port=8089):
except splunklib.binding.AuthenticationError:
raise AuthenticationErrorSearchException("Login failed.")

def search(self, spl, mode="normal", search_args=None, verbose=False,
days=None, start_time=None, end_time=None, limit=None,
fields="*", internal_fields=False):
def _retrieve_parallel_worker(self, job, offset_queue, page_size, search_results):

while not offset_queue.empty():
offset = offset_queue.get()

paginate_args = dict(
count=page_size,
offset=offset
)

page_results = job.results(**paginate_args)

for result in results.ResultsReader(page_results):
if isinstance(result, dict):
search_results.append(result)


def _retrieve_parallel(self, job, page_size=1000, processes=4):


manager = Manager()
search_results = manager.list()
offset_queue = Queue()


result_count = int(job['resultCount'])

offset = 0

while (offset < result_count):
offset_queue.put(offset)
offset += page_size

workers = list()

for _ in range(processes):
p = Process(
target=self._retrieve_parallel_worker,
args=(
job,
offset_queue,
page_size,
search_results
)
)

workers.append(p)
p.start()

for p in workers:
p.join()

return list(search_results)

def _process_result(self, result, **kwargs):
if isinstance(result, results.Message):
if kwargs['verbose']:
print(f"Message: {result}")
return None

if isinstance(result, dict):
# Remove internal fields if requested
if kwargs['internal_fields'] is False:
for field in [key for key in result.keys() if key.startswith('_')]:
result.pop(field)
elif isinstance(kwargs['internal_fields'], str):
for field in list(map(lambda x: x.strip(), kwargs['internal_fields'].split(','))):
result.pop(field)

return result


def search(self, *args, **kwargs):
'''
Search Splunk and return the results as a list of dicts.
Expand Down Expand Up @@ -96,102 +176,119 @@ def search(self, spl, mode="normal", search_args=None, verbose=False,
from the results. Default is False.
'''

if fields:
spl += f"| fields {fields}"
# Valid argument checking
valid_args = [
'spl',
'mode',
'search_args',
'verbose',
'days',
'start_time',
'end_time',
'limit',
'fields',
'internal_fields',
'processes',
'page_size'
]
for arg in kwargs.keys():
if not arg in valid_args:
raise TypeError(f"search() got an unexpected keyword argument '{arg}'")

# Make sure args contain what we think they should

# Set default values, but only if they don't already exist
kwargs['mode'] = kwargs.get('mode', 'normal')
kwargs['search_args'] = kwargs.get('search_args', None)
kwargs['verbose'] = kwargs.get('verbose', False)
kwargs['days'] = kwargs.get('days', None)
kwargs['start_time'] = kwargs.get('start_time', None)
kwargs['end_time'] = kwargs.get('end_time', None)
kwargs['limit'] = kwargs.get('limit', None)
kwargs['fields'] = kwargs.get('fields', '*')
kwargs['internal_fields'] = kwargs.get('internal_fields', False)
kwargs['processes'] = kwargs.get('processes', 1)
kwargs['page_size'] = kwargs.get('page_size', 1000)

# SPL query can be either the first positional argument or the 'spl' keyword arg
spl = None
if len(args) > 0:
spl = args[0]
elif "spl" in kwargs:
spl = kwargs['spl']

if spl is None:
raise ValueError("You must specify an SPL search string.")

# If present, internal_fields must be either bool or str
if not (isinstance(kwargs['internal_fields'], bool) or isinstance(kwargs['internal_fields'], str)):
raise ValueError(f"internal_fields must be a boolean or a string, not {type(kwargs['internal_fields'])}.")

spl += f"| fields {kwargs['fields']}"

if not search_args or not isinstance(search_args, dict):
search_args = dict()
search_args["search_mode"] = mode
if not kwargs['search_args'] or not isinstance(kwargs['search_args'], dict):
kwargs['search_args'] = dict()
kwargs['search_args']["search_mode"] = kwargs['mode']

if days:
if kwargs['days']:
# Search from current time backwards
search_args["earliest_time"] = "-%dd" % days
kwargs['search_args']["earliest_time"] = "-%dd" % kwargs['days']
else:
if start_time:
if kwargs['start_time']:
# convert to string if it's a datetime
if isinstance(start_time, datetime):
start_time = start_time.isoformat()
search_args["earliest_time"] = start_time
if end_time:
if isinstance(kwargs['start_time'], datetime):
kwargs['start_time'] = kwargs['start_time'].isoformat()
kwargs['search_args']["earliest_time"] = kwargs['start_time']
if kwargs['end_time']:
# convert to string if it's a datetime
if isinstance(end_time, datetime):
end_time = end_time.isoformat()
search_args["latest_time"] = end_time

if limit:
search_args['count'] = limit
# use the "oneshot" job type, since it will accept the 'count'
# argument. Downside is that it's subject to a max result set
# specified in limits.conf on the server, though.
export_results = self.splunk_conn.jobs.oneshot(spl, **search_args)
if isinstance(kwargs['end_time'], datetime):
kwargs['end_time'] = kwargs['end_time'].isoformat()
kwargs['search_args']["latest_time"] = kwargs['end_time']

if kwargs['limit']:

kwargs['search_args']['exec_mode'] = 'blocking'
kwargs['search_args']['search_mode'] = 'normal'
kwargs['search_args']['max_count'] = kwargs['limit']

job = self.splunk_conn.jobs.create(
spl,
**kwargs['search_args']
)

for res in self._retrieve_parallel(job):
res = self._process_result(res, **kwargs)
if res is not None:
yield res
else:
# Use the "export" job type, since that's the most reliable way to
# return possibly large result sets, with no apparent limits
export_results = self.splunk_conn.jobs.export(spl, **search_args)


reader = results.ResultsReader(export_results)

for res in reader:
if isinstance(res, dict):
# Remove internal fields if requested
if internal_fields is True:
pass
elif internal_fields is False:
for field in [key for key in res.keys() if key.startswith('_')]:
res.pop(field)
elif isinstance(internal_fields, str):
for field in list(map(lambda x: x.strip(), internal_fields.split(','))):
res.pop(field)
else:
raise ValueError(f"internal_fields must be a boolean or a string, not {type(internal_fields)}.")
yield res
elif isinstance(res, results.Message) and verbose:
print("Message: %s" % res)

def search_df(self, spl, mode="normal", search_args=None, verbose=False,
days=None, start_time=None, end_time=None, normalize=True,
limit=None, fields="*", internal_fields=False):
search_results = self.splunk_conn.jobs.export(
spl,
**kwargs['search_args']
)

reader = results.ResultsReader(search_results)

for res in reader:
res = self._process_result(res, **kwargs)
if res is not None:
yield res

# def search_df(self, spl, mode="normal", search_args=None, verbose=False,
# days=None, start_time=None, end_time=None, normalize=True,
# limit=None, fields="*", internal_fields=False, processes=1,
# page_size=1000):

def search_df(self, *args, **kwargs):
'''
Search Splunk and return the results as a Pandas DataFrame.
spl: A string containing the Splunk search in SPL form
mode: A string specifying the type of Splunk search to run ("normal" or "realtime")
search_args: A dict containing any additional search parameters to pass to
the Splunk server.
days: Search the past X days. If provided, this supercedes both start_time
and end_time.
start_time: A datetime() object representing the start of the search
window, or a string in Splunk syntax (e.g., "-2d@d"). If used
without end_time, the end of the search window is the current time.
end_time: A datetime() object representing the end of the search window, or a
string in Splunk syntax (e.g., "-2d@d"). If used without start_time,
the search start will be the earliest timestamp in Splunk.
verbose: If True, any errors, warnings or other messages encountered
by the search process will be printed to stdout. The default is False
(suppress these messages).
normalize: If set to True, fields containing structures (i.e. subfields)
will be flattened such that each field has it's own column in
the dataframe. If False, there will be a single column for the
structure, with a JSON string encoding all the contents.
limit: An integer describing the max number of search results to return.
fields: A comma-separated string listing all of the fields to be returned in
the results. If not 'None', this is appended to the end of the 'spl'
query, like so: "| fields field1,field2,field3". The default is '*',
meaning all fields.
internal_fields: Control whether or not to return Splunk's internal fields.
If set to False, all fields with names beginning with an underscore
will be removed from the results. If set to True, nothing will be removed.
If this is a string, treat it as a comma-separated list of fields to remove
from the results. Default is False.
Accepts all the same arguments as the search() function
'''

normalize = kwargs.get('normalize', True)

results = list()
for hit in self.search(spl=spl, mode=mode,
search_args=search_args, verbose=verbose,
days=days, start_time=start_time,
end_time=end_time, limit=limit,
fields=fields, internal_fields=internal_fields):
for hit in self.search(*args, **kwargs):
results.append(hit)

if normalize:
Expand All @@ -200,3 +297,4 @@ def search_df(self, spl, mode="normal", search_args=None, verbose=False,
df = pd.DataFrame(results)

return df

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = fh.read()

setup(name='huntlib',
version='0.4.1',
version='0.4.5',
description='A Python library to help with some common threat hunting data analysis operations',
long_description=long_description,
long_description_content_type="text/markdown",
Expand Down
1 change: 1 addition & 0 deletions support/certificates/instances.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ instances:
dns:
- elastic_test
- localhost
- host.docker.internal
ip:
- 127.0.0.1

Loading

0 comments on commit d3a029c

Please sign in to comment.