Skip to content

Commit

Permalink
Merge branch '3234-plugin' into develop
Browse files Browse the repository at this point in the history
Issue #3234
PR #3304
  • Loading branch information
mssalvatore committed May 5, 2023
2 parents 47dc109 + 808b503 commit 66d37c6
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 26 deletions.
111 changes: 111 additions & 0 deletions monkey/agent_plugins/exploiters/snmp/src/plugin.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import logging
from functools import partial
from pprint import pformat
from typing import Any, Dict, Sequence

# common imports
from common.event_queue import IAgentEventPublisher
from common.types import AgentID, Event
from common.utils.code_utils import del_key

# dependencies to get rid of or internalize
from infection_monkey.exploit import IAgentBinaryRepository, IAgentOTPProvider
from infection_monkey.exploit.tools import all_udp_ports_are_closed
from infection_monkey.exploit.tools.http_agent_binary_server import start_agent_binary_server
from infection_monkey.i_puppet import ExploiterResultData, TargetHost
from infection_monkey.network import TCPPortSelector
from infection_monkey.propagation_credentials_repository import IPropagationCredentialsRepository

logger = logging.getLogger(__name__)


class StubSNMPOptions:
def __init__(self, *args, **kwargs):
pass


class StubSNMPExploiter:
def __init__(self, *args, **kwargs):
pass

def exploit_host(self, *args, **kwargs):
pass


class StubSNMPExploitClient:
def __init__(self, *args, **kwargs):
pass


SNMP_PORTS = [161]


def should_attempt_exploit(host: TargetHost) -> bool:
return not all_udp_ports_are_closed(host, SNMP_PORTS)


class Plugin:
def __init__(
self,
*,
plugin_name: str,
agent_id: AgentID,
agent_event_publisher: IAgentEventPublisher,
agent_binary_repository: IAgentBinaryRepository,
propagation_credentials_repository: IPropagationCredentialsRepository,
tcp_port_selector: TCPPortSelector,
otp_provider: IAgentOTPProvider,
**kwargs,
):
exploit_client = StubSNMPExploitClient(agent_id, agent_event_publisher)
agent_binary_server_factory = partial(
start_agent_binary_server,
agent_binary_repository=agent_binary_repository,
tcp_port_selector=tcp_port_selector,
)

self._snmp_exploiter = StubSNMPExploiter(
agent_id,
exploit_client,
agent_binary_server_factory,
propagation_credentials_repository,
otp_provider,
)

def run(
self,
*,
host: TargetHost,
servers: Sequence[str],
current_depth: int,
options: Dict[str, Any],
interrupt: Event,
**kwargs,
) -> ExploiterResultData:
# HTTP ports options are hack because they are needed in fingerprinters
del_key(options, "http_ports")

try:
logger.debug(f"Parsing options: {pformat(options)}")
snmp_options = StubSNMPOptions(**options)
except Exception as err:
msg = f"Failed to parse SNMP options: {err}"
logger.exception(msg)
return ExploiterResultData(error_message=msg)

if not should_attempt_exploit(host):
msg = f"Host {host.ip} has no open SNMP ports"
logger.debug(msg)
return ExploiterResultData(
exploitation_success=False, propagation_success=False, error_message=msg
)

try:
logger.debug(f"Running SNMP exploiter on host {host.ip}")
return self._snmp_exploiter.exploit_host(
host, servers, current_depth, snmp_options, interrupt
)
except Exception as err:
msg = f"An unexpected exception occurred while attempting to exploit host: {err}"
logger.exception(msg)
return ExploiterResultData(error_message=msg)
2 changes: 1 addition & 1 deletion monkey/infection_monkey/exploit/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
from .i_remote_access_client_factory import IRemoteAccessClientFactory
from .brute_force_credentials_provider import BruteForceCredentialsProvider
from .brute_force_exploiter import BruteForceExploiter
from .utils import all_exploitation_ports_are_closed
from .utils import all_exploitation_ports_are_closed, all_udp_ports_are_closed
13 changes: 13 additions & 0 deletions monkey/infection_monkey/exploit/tools/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
from typing import Sequence

from common.types import NetworkPort
from infection_monkey.i_puppet import TargetHost


def all_exploitation_ports_are_closed(host: TargetHost, exploitation_ports: Sequence[int]) -> bool:
closed_tcp_ports = host.ports_status.tcp_ports.closed
return all([p in closed_tcp_ports for p in exploitation_ports])


def all_udp_ports_are_closed(host: TargetHost, udp_ports: Sequence[NetworkPort]) -> bool:
"""
Check if all UDP ports in a given sequence are closed on the host
:param host: The host to check
:param udp_ports: The sequence of UDP ports to check
:return: True if all ports are closed, False otherwise
"""
closed_udp_ports = host.ports_status.udp_ports.closed
return all([p in closed_udp_ports for p in udp_ports])
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
from ipaddress import IPv4Address
from typing import List

import pytest

from common import OperatingSystem
from common.types import PortStatus
from infection_monkey.exploit.tools import all_exploitation_ports_are_closed
from common.types import NetworkPort, PortStatus
from infection_monkey.exploit.tools import (
all_exploitation_ports_are_closed,
all_udp_ports_are_closed,
)
from infection_monkey.i_puppet import PortScanData, PortScanDataDict, TargetHost, TargetHostPorts

TARGET_IP = IPv4Address("127.0.0.1")
PORTS_STATUS = TargetHostPorts(
tcp_ports=PortScanDataDict(
{
5000: PortScanData(port=5000, status=PortStatus.OPEN),
5001: PortScanData(port=5001, status=PortStatus.CLOSED),
6000: PortScanData(port=6000, status=PortStatus.OPEN),
6001: PortScanData(port=6001, status=PortStatus.CLOSED),
NetworkPort(5000): PortScanData(port=5000, status=PortStatus.OPEN),
NetworkPort(5001): PortScanData(port=5001, status=PortStatus.CLOSED),
NetworkPort(6000): PortScanData(port=6000, status=PortStatus.OPEN),
NetworkPort(6001): PortScanData(port=6001, status=PortStatus.CLOSED),
}
)
),
udp_ports=PortScanDataDict(
{
NetworkPort(7000): PortScanData(port=7000, status=PortStatus.OPEN),
NetworkPort(7001): PortScanData(port=7001, status=PortStatus.CLOSED),
NetworkPort(8000): PortScanData(port=8000, status=PortStatus.OPEN),
NetworkPort(8001): PortScanData(port=8001, status=PortStatus.CLOSED),
}
),
)


Expand All @@ -29,36 +41,51 @@ def target_host() -> TargetHost:
)


def test_all_exploitation_ports_open(target_host):
exploitation_ports = [5000, 6000]

assert not all_exploitation_ports_are_closed(target_host, exploitation_ports)


def test_some_exploitation_ports_open(target_host):
exploitation_ports = [5000, 5001, 6000, 6001]
@pytest.mark.parametrize(
"all_ports_are_closed, ports_to_check",
[(all_exploitation_ports_are_closed, [5000, 6000]), (all_udp_ports_are_closed, [7000, 8000])],
)
def test_all_exploitation_ports_open(target_host, all_ports_are_closed, ports_to_check):
assert not all_ports_are_closed(target_host, ports_to_check)

assert not all_exploitation_ports_are_closed(target_host, exploitation_ports)

@pytest.mark.parametrize(
"all_ports_are_closed, ports_to_check",
[
(all_exploitation_ports_are_closed, [5000, 5001, 6000, 6001]),
(all_udp_ports_are_closed, [7000, 7001, 8000, 8001]),
],
)
def test_some_exploitation_ports_open(target_host, all_ports_are_closed, ports_to_check):
assert not all_ports_are_closed(target_host, ports_to_check)

def test_all_exploitation_ports_closed(target_host):
exploitation_ports = [5001, 6001]

assert all_exploitation_ports_are_closed(target_host, exploitation_ports)
@pytest.mark.parametrize(
"all_ports_are_closed, ports_to_check",
[(all_exploitation_ports_are_closed, [5001, 6001]), (all_udp_ports_are_closed, [7001, 8001])],
)
def test_all_exploitation_ports_closed(target_host, all_ports_are_closed, ports_to_check):
assert all_ports_are_closed(target_host, ports_to_check)


def test_no_exploitation_ports_exist(target_host):
exploitation_ports = []
@pytest.mark.parametrize(
"all_ports_are_closed", [(all_exploitation_ports_are_closed), (all_udp_ports_are_closed)]
)
def test_no_exploitation_ports_exist(target_host, all_ports_are_closed):
exploitation_ports: List[NetworkPort] = []

assert all_exploitation_ports_are_closed(target_host, exploitation_ports)
assert all_ports_are_closed(target_host, exploitation_ports)


def test_host_has_no_ports_status():
@pytest.mark.parametrize(
"all_ports_are_closed, ports_to_check",
[(all_exploitation_ports_are_closed, [5000, 5001]), (all_udp_ports_are_closed, [7000, 7001])],
)
def test_host_has_no_ports_status(all_ports_are_closed, ports_to_check):
target_host = TargetHost(
ip=TARGET_IP,
operating_system=OperatingSystem.WINDOWS,
ports_status=PortScanDataDict({}),
)
exploitation_ports = [5000, 5001]

assert not all_exploitation_ports_are_closed(target_host, exploitation_ports)
assert not all_ports_are_closed(target_host, ports_to_check)

0 comments on commit 66d37c6

Please sign in to comment.