Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added multithreading support for additional connections (+fixes) #645

Merged
merged 27 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b6b5a12
Multithreading support when using multiple connections
d3vzer0 Mar 14, 2023
b25d812
Merge branch 'microsoft:main' into multithreading
d3vzer0 Mar 17, 2023
05bab7d
Renamed additional connection column in results df
d3vzer0 Mar 21, 2023
051f55e
Merge branch 'multithreading' of github.com:d3vzer0/msticpy into mult…
d3vzer0 Mar 21, 2023
3406c9d
Fix flake warning
d3vzer0 Mar 21, 2023
41f8589
Merge branch 'main' into multithreading
ianhelle Mar 22, 2023
ead4cb5
Merge pull request #2 from microsoft/main
d3vzer0 May 5, 2023
3f75c5b
Merge branch 'main' into pr/d3vzer0/645
ianhelle May 11, 2023
1725b05
Adding threaded execution for both multiple instances and split queri…
ianhelle May 16, 2023
08f2236
Merge branch 'main' into multithreading
ianhelle May 16, 2023
d4f56b6
Fixing issue with unit_test_lib not properly isolating temporary sett…
ianhelle May 16, 2023
4ce1f48
Merge branch 'multithreading' of https://github.com/d3vzer0/msticpy i…
ianhelle May 16, 2023
ceab46e
Adding locking around pivot data providers loader to fix config file …
ianhelle May 16, 2023
77aaddd
Merge branch 'main' into multithreading
ianhelle May 16, 2023
b9362ae
Merge branch 'main' into multithreading
ianhelle May 20, 2023
a279b97
Fixing some bugs in multi-threaded code - ensuring that loop is avail…
ianhelle May 23, 2023
a1f9921
Merge branch 'multithreading' of https://github.com/d3vzer0/msticpy i…
ianhelle May 23, 2023
1033330
Merge branch 'main' into pr/d3vzer0/645
ianhelle May 23, 2023
eb5c92a
Fxing handling of datetime/pd.Timestamp in query_provider_connections…
ianhelle May 24, 2023
85598d4
Typo in data_providers (self.logger instead of logger)
ianhelle May 24, 2023
2d1263f
Typo calling logger.info in data_providers.py
ianhelle May 24, 2023
99035d1
Cleaned up and refactored code in query_provider_connections_mixin.py
ianhelle May 25, 2023
3d6deff
Typo in type annotation in query_provider_connections_mixin
ianhelle May 25, 2023
19a9cc1
Removing redundant line in mdatp_driver
ianhelle May 26, 2023
a4dc918
Merge branch 'main' into pr/d3vzer0/645
ianhelle Jul 3, 2023
bd57b04
Merge branch 'main' into multithreading
ianhelle Jul 4, 2023
45fb85b
Bug in commit from merge - missing self._connection_str attribute in …
ianhelle Jul 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 37 additions & 16 deletions msticpy/data/core/data_providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
# --------------------------------------------------------------------------
"""Data provider loader."""
import re
import concurrent.futures
from collections import abc
from datetime import datetime
from functools import partial
from itertools import tee
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Dict, Iterable, List, NamedTuple, Optional, Pattern, Union

import pandas as pd
Expand Down Expand Up @@ -122,6 +124,7 @@ def __init__( # noqa: MC0001

# Add any query files
data_env_queries: Dict[str, QueryStore] = {}
self._query_paths = query_paths
if driver.use_query_paths:
data_env_queries.update(
self._read_queries_from_paths(query_paths=query_paths)
Expand Down Expand Up @@ -193,12 +196,13 @@ def add_connection(

"""
# create a new instance of the driver class
new_driver = self.driver_class(**(self._driver_kwargs))
# connect
new_driver = QueryProvider(self.environment,
ianhelle marked this conversation as resolved.
Show resolved Hide resolved
self._query_provider,
self._query_paths,
**kwargs)
new_driver.connect(connection_str=connection_str, **kwargs)
# add to collection
driver_key = alias or str(len(self._additional_connections))
self._additional_connections[driver_key] = new_driver
self._additional_connections[driver_key] = new_driver._query_provider

@property
def connected(self) -> bool:
Expand Down Expand Up @@ -449,22 +453,39 @@ def exec_query(self, query: str, **kwargs) -> Union[pd.DataFrame, Any]:
"""
query_options = kwargs.pop("query_options", {}) or kwargs
query_source = kwargs.pop("query_source", None)
result = self._query_provider.query(
query, query_source=query_source, **query_options
)

if not self._additional_connections:
result = self._query_provider.query(
ianhelle marked this conversation as resolved.
Show resolved Hide resolved
query, query_source=query_source, **query_options
)
return result

# run query against all connections
results = [result]
results = []
print(f"Running query for {len(self._additional_connections)} connections.")
for con_name, connection in self._additional_connections.items():
print(f"{con_name}...")
try:
results.append(
connection.query(query, query_source=query_source, **query_options)
)
except MsticpyDataQueryError:
print(f"Query {con_name} failed.")
if self._query_provider._support_async:
with ThreadPoolExecutor(max_workers=self._query_provider.max_threads) as executor:
ianhelle marked this conversation as resolved.
Show resolved Hide resolved
all_tasks = {}
for con_name, connection in self._additional_connections.items():
try:
task = executor.submit(connection.query, query, query_source=query_source, **query_options)
all_tasks[task] = con_name
except MsticpyDataQueryError:
print(f"Query {con_name} failed.")
pass

for future in concurrent.futures.as_completed(all_tasks):
result = future.result()
result['MsticpyInstance'] = all_tasks[future]
results.append(result)
else:
for con_name, connection in self._additional_connections.items():
try:
query_res = connection.query(query, query_source=query_source, **query_options)
query_res['MsticpyInstance'] = con_name
results.append(query_res)
except MsticpyDataQueryError:
print(f"Query {con_name} failed.")
return pd.concat(results)

def browse_queries(self, **kwargs):
Expand Down
2 changes: 2 additions & 0 deletions msticpy/data/drivers/driver_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def __init__(self, **kwargs):
self.data_environment = kwargs.get("data_environment")
self._query_filter: Dict[str, Set[str]] = defaultdict(set)
self._instance: Optional[str] = None
self._support_async = False
self.max_threads = kwargs.get("max_threads", 4)

@property
def loaded(self) -> bool:
Expand Down
1 change: 1 addition & 0 deletions msticpy/data/drivers/mdatp_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(self, connection_str: str = None, instance: str = "Default", **kwar

"""
super().__init__(**kwargs)
self._support_async = True
ianhelle marked this conversation as resolved.
Show resolved Hide resolved
cs_dict = _get_driver_settings(
self.CONFIG_NAME, self._ALT_CONFIG_NAMES, instance
)
Expand Down