Skip to content

Commit

Permalink
Fixing tests that depend on GeoLiteLookup - replace with mock class.
Browse files Browse the repository at this point in the history
Temporary workaround for convert_to_ip_entities in host.py
  • Loading branch information
ianhelle committed Mar 24, 2021
1 parent 72d7c59 commit 419cce1
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ repos:
hooks:
- id: pylint
args:
- --disable=E0401
- --disable=E0401,W0511
- --ignore-patterns=test_
- repo: https://gitlab.com/pycqa/flake8
rev: 3.8.4
Expand Down
107 changes: 102 additions & 5 deletions msticnb/nblib/azsent/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@
"""host_network_summary notebooklet."""
from collections import namedtuple
from functools import lru_cache
from typing import Dict, List, Set
from typing import Dict, List, Optional, Set, Union

import pandas as pd
from msticpy.common.timespan import TimeSpan
from msticpy.data import QueryProvider
from msticpy.datamodel import entities
from msticpy.datamodel.entities import IpAddress
from msticpy.sectools.ip_utils import convert_to_ip_entities

from ..._version import VERSION
Expand Down Expand Up @@ -231,12 +232,12 @@ def populate_host_entity(
if df_has_data(vmcomputer_df):
ip_vm = vmcomputer_df.iloc[0] # type: ignore
_extract_vmcomputer(ip_vm, host_entity)
ip_ents = convert_to_ip_entities(data=vmcomputer_df, ip_col="Ipv4Addresses")
ip_ents = _convert_to_ip_entities(data=vmcomputer_df, ip_col="Ipv4Addresses")
ip_entities.extend(
ip_ent for ip_ent in ip_ents if ip_ent.Address not in ip_unique
)
ip_unique |= {ip_ent.Address for ip_ent in ip_ents}
ip_ents = convert_to_ip_entities(data=vmcomputer_df, ip_col="Ipv6Addresses")
ip_ents = _convert_to_ip_entities(data=vmcomputer_df, ip_col="Ipv6Addresses")
ip_entities.extend(
ip_ent for ip_ent in ip_ents if ip_ent.Address not in ip_unique
)
Expand All @@ -246,8 +247,8 @@ def populate_host_entity(
if df_has_data(az_net_df):
if not host_entity.HostName:
host_entity.HostName = az_net_df.iloc[0].Computer # type: ignore
ip_priv = convert_to_ip_entities(data=az_net_df, ip_col="PrivateIPAddresses")
ip_pub = convert_to_ip_entities(data=az_net_df, ip_col="PublicIPAddresses")
ip_priv = _convert_to_ip_entities(data=az_net_df, ip_col="PrivateIPAddresses")
ip_pub = _convert_to_ip_entities(data=az_net_df, ip_col="PublicIPAddresses")
host_entity["PrivateIPAddresses"] = []
host_entity["PrivateIPAddresses"].extend(
ip_ent for ip_ent in ip_priv if ip_ent.Address not in ip_unique
Expand Down Expand Up @@ -324,3 +325,99 @@ def _extract_vmcomputer(ip_vm, host_entity):
"ResourceGroup": ip_vm["AzureResourceGroup"],
"ResourceId": ip_vm["_ResourceId"],
}


# TODO - temporary workaround until iputils.convert_to_ip_entities
# is fixed in MSTICPY
def _convert_to_ip_entities(
ip_str: Optional[str] = None,
data: Optional[pd.DataFrame] = None,
ip_col: Optional[str] = None,
geo_lookup: bool = True,
) -> List[IpAddress]:
"""
Take in an IP Address string and converts it to an IP Entity.
Parameters
----------
ip_str : str
A string with a single IP Address or multiple addresses
delimited by comma or space
data : pd.DataFrame
Use DataFrame as input
ip_col : str
Column containing IP addresses
geo_lookup : bool
If true, do geolocation lookup on IPs,
by default, True
Returns
-------
List
The populated IP entities including address and geo-location
Raises
------
ValueError
If neither ip_string or data/column provided as input
"""
try:
return convert_to_ip_entities(
ip_str=ip_str, data=data, ip_col=ip_col, geo_lookup=geo_lookup
)
except AttributeError:
pass
ip_entities: List[IpAddress] = []
all_ips: Set[str] = set()

if ip_str:
addrs = arg_to_list(ip_str)
elif data is not None and ip_col:
addrs = data[ip_col].values
else:
raise ValueError("Must specify either ip_str or data + ip_col parameters.")

for addr in addrs:
if isinstance(addr, list):
ip_list = set(addr)
elif isinstance(addr, str) and "," in addr:
ip_list = {ip.strip() for ip in addr.split(",")}
else:
ip_list = {addr}
ip_list = ip_list - all_ips # remove IP addresses we've seen
ip_entities.extend(IpAddress(Address=ip) for ip in ip_list)

return ip_entities


def arg_to_list(arg: Union[str, List[str]], delims=",; ") -> List[str]:
"""
Convert an optional list/str/str with delims into a list.
Parameters
----------
arg : Union[str, List[str]]
A string, delimited string or list
delims : str, optional
The default delimiters to use, by default ",; "
Returns
-------
List[str]
List of string components
Raises
------
TypeError
If `arg` is not a string or list
"""
if isinstance(arg, list):
return arg
if isinstance(arg, str):
for char in delims:
if char in arg:
return [item.strip() for item in arg.split(char)]
return [arg]
raise TypeError("`arg` must be a string or a list.")
9 changes: 5 additions & 4 deletions tests/nb/azsent/account/test_account_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@
import pandas as pd
from bokeh.models import LayoutDOM
from msticnb import nblts
from msticnb.data_providers import init
from msticnb import data_providers
from msticpy.common.timespan import TimeSpan
from msticpy.datamodel import entities
from msticpy.nbtools import nbwidgets

from ....unit_test_lib import TEST_DATA_PATH
from ....unit_test_lib import TEST_DATA_PATH, GeoIPLiteMock

# pylint: disable=protected-access, no-member


def test_account_summary_notebooklet():
def test_account_summary_notebooklet(monkeypatch):
"""Test basic run of notebooklet."""
test_data = str(Path(TEST_DATA_PATH).absolute())
init(
monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
data_providers.init(
query_provider="LocalData",
LocalData_data_paths=[test_data],
LocalData_query_paths=[test_data],
Expand Down
9 changes: 5 additions & 4 deletions tests/nb/azsent/host/test_host_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@
import pandas as pd
import pytest_check as check
from msticnb import nblts
from msticnb.data_providers import init
from msticnb import data_providers
from msticpy.common.timespan import TimeSpan

from ....unit_test_lib import TEST_DATA_PATH
from ....unit_test_lib import TEST_DATA_PATH, GeoIPLiteMock

# pylint: disable=no-member


def test_host_summary_notebooklet():
def test_host_summary_notebooklet(monkeypatch):
"""Test basic run of notebooklet."""
monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
test_data = str(Path(TEST_DATA_PATH).absolute())
init(
data_providers.init(
query_provider="LocalData",
LocalData_data_paths=[test_data],
LocalData_query_paths=[test_data],
Expand Down
9 changes: 5 additions & 4 deletions tests/nb/azsent/host/test_win_host_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,19 @@

from msticpy.common.timespan import TimeSpan
from msticnb import nblts
from msticnb.data_providers import init
from msticnb import data_providers

from ....unit_test_lib import TEST_DATA_PATH
from ....unit_test_lib import TEST_DATA_PATH, GeoIPLiteMock


# pylint: disable=no-member


def test_winhostevents_notebooklet():
def test_winhostevents_notebooklet(monkeypatch):
"""Test basic run of notebooklet."""
test_data = str(Path(TEST_DATA_PATH).absolute())
init(
monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
data_providers.init(
query_provider="LocalData",
LocalData_data_paths=[test_data],
LocalData_query_paths=[test_data],
Expand Down
9 changes: 5 additions & 4 deletions tests/nb/template/test_nb_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@
import pandas as pd

from msticpy.common.timespan import TimeSpan
from msticnb.data_providers import init
from msticnb import data_providers
from msticnb.nb.template.nb_template import TemplateNB
from ...unit_test_lib import TEST_DATA_PATH
from ...unit_test_lib import TEST_DATA_PATH, GeoIPLiteMock


def test_template_notebooklet():
def test_template_notebooklet(monkeypatch):
"""Test basic run of notebooklet."""
test_data = str(Path(TEST_DATA_PATH).absolute())
init(
monkeypatch.setattr(data_providers, "GeoLiteLookup", GeoIPLiteMock)
data_providers.init(
query_provider="LocalData",
LocalData_data_paths=[test_data],
LocalData_query_paths=[test_data],
Expand Down
1 change: 0 additions & 1 deletion tests/test_dataprovider.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import pytest_check as check
from msticpy.data import QueryProvider
from msticpy.sectools.geoip import GeoLiteLookup
from msticpy.sectools import TILookup

from msticnb import data_providers
Expand Down

0 comments on commit 419cce1

Please sign in to comment.