From f858728cca1c18b9b7d505811c17dc3d6d73a8fe Mon Sep 17 00:00:00 2001 From: Limmen Date: Mon, 13 May 2024 17:54:50 +0200 Subject: [PATCH] posg solve example --- examples/training/posg_solve/README.md | 21 +++ .../run_vs_random_attacker_v_001.py | 40 +++++ .../dao/intrusion_recovery_game_config.py | 123 +++++++++++++ .../util/intrusion_recovery_pomdp_util.py | 162 +++++++++++++++++- 4 files changed, 345 insertions(+), 1 deletion(-) create mode 100644 examples/training/posg_solve/README.md create mode 100644 examples/training/posg_solve/intrusion_recovery_pomdp/run_vs_random_attacker_v_001.py create mode 100644 simulation-system/libs/csle-tolerance/src/csle_tolerance/dao/intrusion_recovery_game_config.py diff --git a/examples/training/posg_solve/README.md b/examples/training/posg_solve/README.md new file mode 100644 index 000000000..e042269fd --- /dev/null +++ b/examples/training/posg_solve/README.md @@ -0,0 +1,21 @@ +# HSVI c++ + +This directory contains example scripts for solving OS-POSGs using [hsvi](https://www.sciencedirect.com/science/article/pii/S0004370222001783). + +Command for running hsvi with game file "apt_game.posg", 0.01 epsilon (target precision), +4 pDelta (presolve delta which determined the lenght of the presolve phase), and 2000 pLimit (presolve time-limit) +```bash +./StochasticGamesCpp games/apt_game.posg 0.01 4 2000 +``` + +## Author & Maintainer + +Kim Hammar + +## Copyright and license + +[LICENSE](../../../LICENSE.md) + +Creative Commons + +(C) 2020-2024, Kim Hammar \ No newline at end of file diff --git a/examples/training/posg_solve/intrusion_recovery_pomdp/run_vs_random_attacker_v_001.py b/examples/training/posg_solve/intrusion_recovery_pomdp/run_vs_random_attacker_v_001.py new file mode 100644 index 000000000..016d1b1ca --- /dev/null +++ b/examples/training/posg_solve/intrusion_recovery_pomdp/run_vs_random_attacker_v_001.py @@ -0,0 +1,40 @@ +import numpy as np +from csle_tolerance.dao.intrusion_recovery_game_config import IntrusionRecoveryGameConfig +from csle_tolerance.util.intrusion_recovery_pomdp_util import IntrusionRecoveryPomdpUtil + +if __name__ == '__main__': + eta = 8 + p_a = 1 + p_c_1 = 0.01 + BTR = np.inf + negate_costs = False + discount_factor = 0.999 + num_observations = 10 + simulation_name = "csle-tolerance-intrusion-recovery-pomdp-defender-001" + cost_tensor = IntrusionRecoveryPomdpUtil.cost_tensor(eta=eta, states=IntrusionRecoveryPomdpUtil.state_space(), + actions=IntrusionRecoveryPomdpUtil.action_space(), + negate=negate_costs) + observation_tensor = IntrusionRecoveryPomdpUtil.observation_tensor( + states=IntrusionRecoveryPomdpUtil.state_space(), + observations=IntrusionRecoveryPomdpUtil.observation_space(num_observations=num_observations)) + transition_tensor = IntrusionRecoveryPomdpUtil.transition_tensor_game( + states=IntrusionRecoveryPomdpUtil.state_space(), defender_actions=IntrusionRecoveryPomdpUtil.action_space(), + attacker_actions=IntrusionRecoveryPomdpUtil.action_space(), p_a=p_a, p_c_1=p_c_1) + config = IntrusionRecoveryGameConfig( + eta=eta, p_a=p_a, p_c_1=p_c_1, BTR=BTR, negate_costs=negate_costs, seed=999, + discount_factor=discount_factor, states=IntrusionRecoveryPomdpUtil.state_space(), + actions=IntrusionRecoveryPomdpUtil.action_space(), + observations=IntrusionRecoveryPomdpUtil.observation_space(num_observations=num_observations), + cost_tensor=cost_tensor, observation_tensor=observation_tensor, transition_tensor=transition_tensor, + b1=IntrusionRecoveryPomdpUtil.initial_belief(p_a=p_a), T=BTR, + simulation_env_name=simulation_name, gym_env_name="csle-tolerance-intrusion-recovery-pomdp-v1" + ) + + # s = 0 + # for i in range(100): + # s = IntrusionRecoveryPomdpUtil.sample_next_state_game(transition_tensor=config.transition_tensor, s=s, + # a1=0, a2=1) + # c = config.cost_tensor[0][s] + # print(f"cost: {c}, s: {s}") + + IntrusionRecoveryPomdpUtil.generate_os_posg_game_file(game_config=config) diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/dao/intrusion_recovery_game_config.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/dao/intrusion_recovery_game_config.py new file mode 100644 index 000000000..5537e34b7 --- /dev/null +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/dao/intrusion_recovery_game_config.py @@ -0,0 +1,123 @@ +from typing import List, Dict, Any +import numpy as np +from csle_common.dao.simulation_config.simulation_env_input_config import SimulationEnvInputConfig + + +class IntrusionRecoveryGameConfig(SimulationEnvInputConfig): + """ + DTO containing the configuration of an intrusion recovery POSG + """ + + def __init__(self, eta: float, p_a: float, p_c_1: float, BTR: int, negate_costs: bool, + seed: int, discount_factor: float, states: List[int], actions: List[int], observations: List[int], + cost_tensor: List[List[float]], observation_tensor: List[List[float]], + transition_tensor: List[List[List[List[float]]]], b1: List[float], T: int, simulation_env_name: str, + gym_env_name: str, max_horizon: float = np.inf) -> None: + """ + Initializes the DTO + + :param eta: the scaling factor for the cost function + :param p_a: the intrusion probability + :param p_c_1: the crash probability in the healthy state + :param BTR: the periodic recovery interval + :param negate_costs: boolean flag indicating whether costs should be negated or not + :param seed: the random seed + :param discount_factor: the discount factor + :param states: the list of states + :param actions: the list of actions + :param observations: the list of observations + :param cost_tensor: the cost tensor + :param observation_tensor: the observation tensor + :param transition_tensor: the transition tensor + :param b1: the initial belief + :param T: the time horizon + :param simulation_env_name: name of the simulation environment + :param gym_env_name: name of the gym environment + :param max_horizon: the maximum horizon to avoid infinie simulations + """ + self.eta = eta + self.p_a = p_a + self.p_c_1 = p_c_1 + self.BTR = BTR + self.negate_costs = negate_costs + self.seed = seed + self.discount_factor = discount_factor + self.states = states + self.actions = actions + self.observations = observations + self.cost_tensor = cost_tensor + self.observation_tensor = observation_tensor + self.transition_tensor = transition_tensor + self.b1 = b1 + self.T = T + self.simulation_env_name = simulation_env_name + self.gym_env_name = gym_env_name + self.max_horizon = max_horizon + + def __str__(self) -> str: + """ + :return: a string representation of the DTO + """ + return (f"eta: {self.eta}, p_a: {self.p_a}, p_c_1: {self.p_c_1}," + f"BTR: {self.BTR}, negate_costs: {self.negate_costs}, seed: {self.seed}, " + f"discount_factor: {self.discount_factor}, states: {self.states}, actions: {self.actions}, " + f"observations: {self.observation_tensor}, cost_tensor: {self.cost_tensor}, " + f"observation_tensor: {self.observation_tensor}, transition_tensor: {self.transition_tensor}, " + f"b1:{self.b1}, T: {self.T}, simulation_env_name: {self.simulation_env_name}, " + f"gym_env_name: {self.gym_env_name}, max_horizon: {self.max_horizon}") + + @staticmethod + def from_dict(d: Dict[str, Any]) -> "IntrusionRecoveryGameConfig": + """ + Converts a dict representation to an instance + + :param d: the dict to convert + :return: the created instance + """ + dto = IntrusionRecoveryGameConfig( + eta=d["eta"], p_a=d["p_a"], p_c_1=d["p_c_1"], BTR=d["BTR"], + negate_costs=d["negate_costs"], seed=d["seed"], discount_factor=d["discount_factor"], states=d["states"], + actions=d["actions"], observations=d["observations"], cost_tensor=d["cost_tensor"], + observation_tensor=d["observation_tensor"], transition_tensor=d["transition_tensor"], b1=d["b1"], + T=d["T"], simulation_env_name=d["simulation_env_name"], gym_env_name=d["gym_env_name"]) + return dto + + def to_dict(self) -> Dict[str, Any]: + """ + Gets a dict representation of the object + + :return: A dict representation of the object + """ + d: Dict[str, Any] = {} + d["eta"] = self.eta + d["p_a"] = self.p_a + d["p_c_1"] = self.p_c_1 + d["BTR"] = self.BTR + d["negate_costs"] = self.negate_costs + d["seed"] = self.seed + d["discount_factor"] = self.discount_factor + d["states"] = self.states + d["actions"] = self.actions + d["observations"] = self.observations + d["cost_tensor"] = self.cost_tensor + d["observation_tensor"] = self.observation_tensor + d["transition_tensor"] = self.transition_tensor + d["b1"] = self.b1 + d["T"] = self.T + d["simulation_env_name"] = self.simulation_env_name + d["gym_env_name"] = self.simulation_env_name + return d + + @staticmethod + def from_json_file(json_file_path: str) -> "IntrusionRecoveryGameConfig": + """ + Reads a json file and converts it to a DTO + + :param json_file_path: the json file path + :return: the converted DTO + """ + import io + import json + with io.open(json_file_path, 'r') as f: + json_str = f.read() + return IntrusionRecoveryGameConfig.from_dict(json.loads(json_str)) diff --git a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py index f66aaced2..a6aac5b7e 100644 --- a/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py +++ b/simulation-system/libs/csle-tolerance/src/csle_tolerance/util/intrusion_recovery_pomdp_util.py @@ -2,6 +2,7 @@ from scipy.stats import betabinom import numpy as np from csle_tolerance.dao.intrusion_recovery_pomdp_config import IntrusionRecoveryPomdpConfig +from csle_tolerance.dao.intrusion_recovery_game_config import IntrusionRecoveryGameConfig class IntrusionRecoveryPomdpUtil: @@ -26,7 +27,7 @@ def initial_belief(p_a: float) -> List[float]: :param p_a: the attack probability :return: the initial belief state """ - return [1 - p_a, p_a, 0] + return [1, 0, 0] @staticmethod def action_space() -> List[int]: @@ -165,6 +166,33 @@ def transition_function(s: int, s_prime: int, a: int, p_a: float, p_c_1: float, else: return 0 + @staticmethod + def transition_function_game(s: int, s_prime: int, a1: int, a2: int, p_a: float, p_c_1: float) -> float: + """ + The transition function of the POSG + + :param s: the state + :param s_prime: the next state + :param a1: the defender action + :param a2: the attacker action + :param p_a: the intrusion probability + :param p_c_1: the crash probability + :return: P(s_prime | s, a1, a2) + """ + if s == 2 and s_prime == 2: + return 1.0 + elif s_prime == 2 and s in [0, 1]: + return p_c_1 + elif s_prime == 0 and a1 == 0 and a2 == 1 and s == 0: + return (1 - p_a) * (1 - p_c_1) + elif (s_prime == 0 and a2 == 0 and s == 0) or (s_prime == 0 and s == 1 and a1 == 1) \ + or (s_prime == 1 and s == 1 and a1 == 0): + return (1 - p_c_1) + elif (s_prime == 1 and s == 0 and a2 == 1): + return (1 - p_c_1) * p_a + else: + return 0 + @staticmethod def transition_tensor(states: List[int], actions: List[int], p_a: float, p_c_1: float, p_c_2: float, p_u: float) \ -> List[List[List[float]]]: @@ -187,10 +215,39 @@ def transition_tensor(states: List[int], actions: List[int], p_a: float, p_c_1: for s_prime in states: s_a_transitions.append(IntrusionRecoveryPomdpUtil.transition_function( s=s, s_prime=s_prime, a=a, p_a=p_a, p_c_1=p_c_1, p_c_2=p_c_2, p_u=p_u)) + assert round(sum(s_a_transitions), 2) == 1.0 a_transitions.append(s_a_transitions) transition_tensor.append(a_transitions) return transition_tensor + @staticmethod + def transition_tensor_game(states: List[int], defender_actions: List[int], attacker_actions: List[int], + p_a: float, p_c_1: float) -> List[List[List[List[float]]]]: + """ + Creates a |A|x|A|x|S|x|S| tensor with the transition probabilities of the POSG + + :param states: the list of states + :param defender_actions: the list of defender actions + :param attacker_actions: the list of attacker actions + :param p_a: the intrusion probability + :param p_c_1: the crash probability + :return: the transition tensor + """ + transition_tensor = [] + for a1 in defender_actions: + a1_transitions = [] + for a2 in attacker_actions: + a2_transitions = [] + for s in states: + s_a1_a2_transitions = [] + for s_prime in states: + s_a1_a2_transitions.append(IntrusionRecoveryPomdpUtil.transition_function_game( + s=s, s_prime=s_prime, a1=a1, a2=a2, p_a=p_a, p_c_1=p_c_1)) + a2_transitions.append(s_a1_a2_transitions) + a1_transitions.append(a2_transitions) + transition_tensor.append(a1_transitions) + return transition_tensor + @staticmethod def sample_initial_state(b1: List[float]) -> int: """ @@ -217,6 +274,20 @@ def sample_next_observation(observation_tensor: List[List[float]], s_prime: int, o = np.random.choice(np.arange(0, len(observations)), p=observation_probs) return int(o) + @staticmethod + def sample_next_state_game(transition_tensor: List[List[List[List[float]]]], s: int, a1: int, a2: int) -> int: + """ + Samples the next observation + + :param s: the current state + :param a1: the defender action + :param a2: the attacker action + :param transition_tensor: the transition tensor + :return: the next state a + """ + s_prime = np.random.choice(np.arange(0, len(transition_tensor[a1][a2][s])), p=transition_tensor[a1][a2][s]) + return int(s_prime) + @staticmethod def bayes_filter(s_prime: int, o: int, a: int, b: List[float], states: List[int], observations: List[int], observation_tensor: List[List[float]], transition_tensor: List[List[List[float]]]) -> float: @@ -342,3 +413,92 @@ def pomdp_solver_file(config: IntrusionRecoveryPomdpConfig) -> str: c = config.cost_tensor[a][s] file_str = file_str + f"R: {a} : {s} : {s_prime} : {o} {c:.80f}\n" return file_str + + @staticmethod + def generate_transitions(game_config: IntrusionRecoveryGameConfig) -> List[str]: + """ + Generates the transition rows of the POSG config file of HSVI + + :param game_config: the game configuration + :return: list of transition rows + """ + transitions = [] + for s in game_config.states: + for a1 in game_config.actions: + for a2 in game_config.actions: + for s_prime in game_config.states: + for i, _ in enumerate(game_config.observations): + tr_prob = game_config.transition_tensor[a1][a2][s][s_prime] + obs_prob = game_config.observation_tensor[a2][i] + prob = tr_prob * obs_prob + if prob > 0: + transition = f"{s} {a1} {a2} {i} {s_prime} {prob}" + transitions.append(transition) + + return transitions + + @staticmethod + def generate_rewards(game_config: IntrusionRecoveryGameConfig) -> List[str]: + """ + Generates the reward rows of the POSG config file of HSVI + + :param game_config: the game configuration + :return: list of reward rows + """ + rewards = [] + for s in game_config.states: + for a1 in game_config.actions: + for a2 in game_config.actions: + r = -game_config.cost_tensor[a1][s] + if r != 0: + rew = f"{s} {a1} {a2} {r}" + rewards.append(rew) + return rewards + + @staticmethod + def generate_os_posg_game_file(game_config: IntrusionRecoveryGameConfig) -> str: + """ + Generates the POSG game file for HSVI + + :param game_config: the game configuration + :return: a string with the contents of the config file + """ + num_partitions = 1 + transitions = IntrusionRecoveryPomdpUtil.generate_transitions(game_config=game_config) + rewards = IntrusionRecoveryPomdpUtil.generate_rewards(game_config=game_config) + game_description = f"{len(game_config.states)} {num_partitions} {len(game_config.actions)} " \ + f"{len(game_config.actions)} " \ + f"{len(game_config.observations)} {len(transitions)} " \ + f"{len(rewards)} {game_config.discount_factor}" + state_desriptions = [] + for s in game_config.states: + state_desriptions.append(f"{s} {0}") + player_1_actions = ["WAIT", "RECOVER"] + player_2_actions = ["FALSEALARM", "ATTACK"] + + player_2_legal_actions = [] + for _ in game_config.states: + player_2_legal_actions.append(" ".join(list(map(lambda x: str(x), game_config.actions)))) + + player_1_legal_actions = [] + player_1_legal_actions.append(" ".join(list(map(lambda x: str(x), game_config.actions)))) + + obs_desriptions = [] + for i, o in enumerate(game_config.observations): + obs_desriptions.append(f"o_{o}") + + initial_belief_str = f"{0} {' '.join(list(map(lambda x: str(x), game_config.b1)))}" + game_file_str = "" + game_file_str = game_file_str + game_description + "\n" + game_file_str = game_file_str + "\n".join(state_desriptions) + "\n" + game_file_str = game_file_str + "\n".join(player_1_actions) + "\n" + game_file_str = game_file_str + "\n".join(player_2_actions) + "\n" + game_file_str = game_file_str + "\n".join(obs_desriptions) + "\n" + game_file_str = game_file_str + "\n".join(player_2_legal_actions) + "\n" + game_file_str = game_file_str + "\n".join(player_1_legal_actions) + "\n" + game_file_str = game_file_str + "\n".join(transitions) + "\n" + game_file_str = game_file_str + "\n".join(rewards) + "\n" + game_file_str = game_file_str + initial_belief_str + with open('recovery_game.txt', 'w') as f: + f.write(game_file_str) + return game_file_str