Skip to content

Commit

Permalink
posg solve example
Browse files Browse the repository at this point in the history
  • Loading branch information
Limmen committed May 13, 2024
1 parent 3c1b4a7 commit f858728
Show file tree
Hide file tree
Showing 4 changed files with 345 additions and 1 deletion.
21 changes: 21 additions & 0 deletions examples/training/posg_solve/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# HSVI c++

This directory contains example scripts for solving OS-POSGs using [hsvi](https://www.sciencedirect.com/science/article/pii/S0004370222001783).

Command for running hsvi with game file "apt_game.posg", 0.01 epsilon (target precision),
4 pDelta (presolve delta which determined the lenght of the presolve phase), and 2000 pLimit (presolve time-limit)
```bash
./StochasticGamesCpp games/apt_game.posg 0.01 4 2000
```

## Author & Maintainer

Kim Hammar <kimham@kth.se>

## Copyright and license

[LICENSE](../../../LICENSE.md)

Creative Commons

(C) 2020-2024, Kim Hammar
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import numpy as np
from csle_tolerance.dao.intrusion_recovery_game_config import IntrusionRecoveryGameConfig
from csle_tolerance.util.intrusion_recovery_pomdp_util import IntrusionRecoveryPomdpUtil

if __name__ == '__main__':
eta = 8
p_a = 1
p_c_1 = 0.01
BTR = np.inf
negate_costs = False
discount_factor = 0.999
num_observations = 10
simulation_name = "csle-tolerance-intrusion-recovery-pomdp-defender-001"
cost_tensor = IntrusionRecoveryPomdpUtil.cost_tensor(eta=eta, states=IntrusionRecoveryPomdpUtil.state_space(),
actions=IntrusionRecoveryPomdpUtil.action_space(),
negate=negate_costs)
observation_tensor = IntrusionRecoveryPomdpUtil.observation_tensor(
states=IntrusionRecoveryPomdpUtil.state_space(),
observations=IntrusionRecoveryPomdpUtil.observation_space(num_observations=num_observations))
transition_tensor = IntrusionRecoveryPomdpUtil.transition_tensor_game(
states=IntrusionRecoveryPomdpUtil.state_space(), defender_actions=IntrusionRecoveryPomdpUtil.action_space(),
attacker_actions=IntrusionRecoveryPomdpUtil.action_space(), p_a=p_a, p_c_1=p_c_1)
config = IntrusionRecoveryGameConfig(
eta=eta, p_a=p_a, p_c_1=p_c_1, BTR=BTR, negate_costs=negate_costs, seed=999,
discount_factor=discount_factor, states=IntrusionRecoveryPomdpUtil.state_space(),
actions=IntrusionRecoveryPomdpUtil.action_space(),
observations=IntrusionRecoveryPomdpUtil.observation_space(num_observations=num_observations),
cost_tensor=cost_tensor, observation_tensor=observation_tensor, transition_tensor=transition_tensor,
b1=IntrusionRecoveryPomdpUtil.initial_belief(p_a=p_a), T=BTR,
simulation_env_name=simulation_name, gym_env_name="csle-tolerance-intrusion-recovery-pomdp-v1"
)

# s = 0
# for i in range(100):
# s = IntrusionRecoveryPomdpUtil.sample_next_state_game(transition_tensor=config.transition_tensor, s=s,
# a1=0, a2=1)
# c = config.cost_tensor[0][s]
# print(f"cost: {c}, s: {s}")

IntrusionRecoveryPomdpUtil.generate_os_posg_game_file(game_config=config)
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from typing import List, Dict, Any
import numpy as np
from csle_common.dao.simulation_config.simulation_env_input_config import SimulationEnvInputConfig


class IntrusionRecoveryGameConfig(SimulationEnvInputConfig):
"""
DTO containing the configuration of an intrusion recovery POSG
"""

def __init__(self, eta: float, p_a: float, p_c_1: float, BTR: int, negate_costs: bool,
seed: int, discount_factor: float, states: List[int], actions: List[int], observations: List[int],
cost_tensor: List[List[float]], observation_tensor: List[List[float]],
transition_tensor: List[List[List[List[float]]]], b1: List[float], T: int, simulation_env_name: str,
gym_env_name: str, max_horizon: float = np.inf) -> None:
"""
Initializes the DTO
:param eta: the scaling factor for the cost function
:param p_a: the intrusion probability
:param p_c_1: the crash probability in the healthy state
:param BTR: the periodic recovery interval
:param negate_costs: boolean flag indicating whether costs should be negated or not
:param seed: the random seed
:param discount_factor: the discount factor
:param states: the list of states
:param actions: the list of actions
:param observations: the list of observations
:param cost_tensor: the cost tensor
:param observation_tensor: the observation tensor
:param transition_tensor: the transition tensor
:param b1: the initial belief
:param T: the time horizon
:param simulation_env_name: name of the simulation environment
:param gym_env_name: name of the gym environment
:param max_horizon: the maximum horizon to avoid infinie simulations
"""
self.eta = eta
self.p_a = p_a
self.p_c_1 = p_c_1
self.BTR = BTR
self.negate_costs = negate_costs
self.seed = seed
self.discount_factor = discount_factor
self.states = states
self.actions = actions
self.observations = observations
self.cost_tensor = cost_tensor
self.observation_tensor = observation_tensor
self.transition_tensor = transition_tensor
self.b1 = b1
self.T = T
self.simulation_env_name = simulation_env_name
self.gym_env_name = gym_env_name
self.max_horizon = max_horizon

def __str__(self) -> str:
"""
:return: a string representation of the DTO
"""
return (f"eta: {self.eta}, p_a: {self.p_a}, p_c_1: {self.p_c_1},"
f"BTR: {self.BTR}, negate_costs: {self.negate_costs}, seed: {self.seed}, "
f"discount_factor: {self.discount_factor}, states: {self.states}, actions: {self.actions}, "
f"observations: {self.observation_tensor}, cost_tensor: {self.cost_tensor}, "
f"observation_tensor: {self.observation_tensor}, transition_tensor: {self.transition_tensor}, "
f"b1:{self.b1}, T: {self.T}, simulation_env_name: {self.simulation_env_name}, "
f"gym_env_name: {self.gym_env_name}, max_horizon: {self.max_horizon}")

@staticmethod
def from_dict(d: Dict[str, Any]) -> "IntrusionRecoveryGameConfig":
"""
Converts a dict representation to an instance
:param d: the dict to convert
:return: the created instance
"""
dto = IntrusionRecoveryGameConfig(
eta=d["eta"], p_a=d["p_a"], p_c_1=d["p_c_1"], BTR=d["BTR"],
negate_costs=d["negate_costs"], seed=d["seed"], discount_factor=d["discount_factor"], states=d["states"],
actions=d["actions"], observations=d["observations"], cost_tensor=d["cost_tensor"],
observation_tensor=d["observation_tensor"], transition_tensor=d["transition_tensor"], b1=d["b1"],
T=d["T"], simulation_env_name=d["simulation_env_name"], gym_env_name=d["gym_env_name"])
return dto

def to_dict(self) -> Dict[str, Any]:
"""
Gets a dict representation of the object
:return: A dict representation of the object
"""
d: Dict[str, Any] = {}
d["eta"] = self.eta
d["p_a"] = self.p_a
d["p_c_1"] = self.p_c_1
d["BTR"] = self.BTR
d["negate_costs"] = self.negate_costs
d["seed"] = self.seed
d["discount_factor"] = self.discount_factor
d["states"] = self.states
d["actions"] = self.actions
d["observations"] = self.observations
d["cost_tensor"] = self.cost_tensor
d["observation_tensor"] = self.observation_tensor
d["transition_tensor"] = self.transition_tensor
d["b1"] = self.b1
d["T"] = self.T
d["simulation_env_name"] = self.simulation_env_name
d["gym_env_name"] = self.simulation_env_name
return d

@staticmethod
def from_json_file(json_file_path: str) -> "IntrusionRecoveryGameConfig":
"""
Reads a json file and converts it to a DTO
:param json_file_path: the json file path
:return: the converted DTO
"""
import io
import json
with io.open(json_file_path, 'r') as f:
json_str = f.read()
return IntrusionRecoveryGameConfig.from_dict(json.loads(json_str))
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from scipy.stats import betabinom
import numpy as np
from csle_tolerance.dao.intrusion_recovery_pomdp_config import IntrusionRecoveryPomdpConfig
from csle_tolerance.dao.intrusion_recovery_game_config import IntrusionRecoveryGameConfig


class IntrusionRecoveryPomdpUtil:
Expand All @@ -26,7 +27,7 @@ def initial_belief(p_a: float) -> List[float]:
:param p_a: the attack probability
:return: the initial belief state
"""
return [1 - p_a, p_a, 0]
return [1, 0, 0]

@staticmethod
def action_space() -> List[int]:
Expand Down Expand Up @@ -165,6 +166,33 @@ def transition_function(s: int, s_prime: int, a: int, p_a: float, p_c_1: float,
else:
return 0

@staticmethod
def transition_function_game(s: int, s_prime: int, a1: int, a2: int, p_a: float, p_c_1: float) -> float:
"""
The transition function of the POSG
:param s: the state
:param s_prime: the next state
:param a1: the defender action
:param a2: the attacker action
:param p_a: the intrusion probability
:param p_c_1: the crash probability
:return: P(s_prime | s, a1, a2)
"""
if s == 2 and s_prime == 2:
return 1.0
elif s_prime == 2 and s in [0, 1]:
return p_c_1
elif s_prime == 0 and a1 == 0 and a2 == 1 and s == 0:
return (1 - p_a) * (1 - p_c_1)
elif (s_prime == 0 and a2 == 0 and s == 0) or (s_prime == 0 and s == 1 and a1 == 1) \
or (s_prime == 1 and s == 1 and a1 == 0):
return (1 - p_c_1)
elif (s_prime == 1 and s == 0 and a2 == 1):
return (1 - p_c_1) * p_a
else:
return 0

@staticmethod
def transition_tensor(states: List[int], actions: List[int], p_a: float, p_c_1: float, p_c_2: float, p_u: float) \
-> List[List[List[float]]]:
Expand All @@ -187,10 +215,39 @@ def transition_tensor(states: List[int], actions: List[int], p_a: float, p_c_1:
for s_prime in states:
s_a_transitions.append(IntrusionRecoveryPomdpUtil.transition_function(
s=s, s_prime=s_prime, a=a, p_a=p_a, p_c_1=p_c_1, p_c_2=p_c_2, p_u=p_u))
assert round(sum(s_a_transitions), 2) == 1.0
a_transitions.append(s_a_transitions)
transition_tensor.append(a_transitions)
return transition_tensor

@staticmethod
def transition_tensor_game(states: List[int], defender_actions: List[int], attacker_actions: List[int],
p_a: float, p_c_1: float) -> List[List[List[List[float]]]]:
"""
Creates a |A|x|A|x|S|x|S| tensor with the transition probabilities of the POSG
:param states: the list of states
:param defender_actions: the list of defender actions
:param attacker_actions: the list of attacker actions
:param p_a: the intrusion probability
:param p_c_1: the crash probability
:return: the transition tensor
"""
transition_tensor = []
for a1 in defender_actions:
a1_transitions = []
for a2 in attacker_actions:
a2_transitions = []
for s in states:
s_a1_a2_transitions = []
for s_prime in states:
s_a1_a2_transitions.append(IntrusionRecoveryPomdpUtil.transition_function_game(
s=s, s_prime=s_prime, a1=a1, a2=a2, p_a=p_a, p_c_1=p_c_1))
a2_transitions.append(s_a1_a2_transitions)
a1_transitions.append(a2_transitions)
transition_tensor.append(a1_transitions)
return transition_tensor

@staticmethod
def sample_initial_state(b1: List[float]) -> int:
"""
Expand All @@ -217,6 +274,20 @@ def sample_next_observation(observation_tensor: List[List[float]], s_prime: int,
o = np.random.choice(np.arange(0, len(observations)), p=observation_probs)
return int(o)

@staticmethod
def sample_next_state_game(transition_tensor: List[List[List[List[float]]]], s: int, a1: int, a2: int) -> int:
"""
Samples the next observation
:param s: the current state
:param a1: the defender action
:param a2: the attacker action
:param transition_tensor: the transition tensor
:return: the next state a
"""
s_prime = np.random.choice(np.arange(0, len(transition_tensor[a1][a2][s])), p=transition_tensor[a1][a2][s])
return int(s_prime)

@staticmethod
def bayes_filter(s_prime: int, o: int, a: int, b: List[float], states: List[int], observations: List[int],
observation_tensor: List[List[float]], transition_tensor: List[List[List[float]]]) -> float:
Expand Down Expand Up @@ -342,3 +413,92 @@ def pomdp_solver_file(config: IntrusionRecoveryPomdpConfig) -> str:
c = config.cost_tensor[a][s]
file_str = file_str + f"R: {a} : {s} : {s_prime} : {o} {c:.80f}\n"
return file_str

@staticmethod
def generate_transitions(game_config: IntrusionRecoveryGameConfig) -> List[str]:
"""
Generates the transition rows of the POSG config file of HSVI
:param game_config: the game configuration
:return: list of transition rows
"""
transitions = []
for s in game_config.states:
for a1 in game_config.actions:
for a2 in game_config.actions:
for s_prime in game_config.states:
for i, _ in enumerate(game_config.observations):
tr_prob = game_config.transition_tensor[a1][a2][s][s_prime]
obs_prob = game_config.observation_tensor[a2][i]
prob = tr_prob * obs_prob
if prob > 0:
transition = f"{s} {a1} {a2} {i} {s_prime} {prob}"
transitions.append(transition)

return transitions

@staticmethod
def generate_rewards(game_config: IntrusionRecoveryGameConfig) -> List[str]:
"""
Generates the reward rows of the POSG config file of HSVI
:param game_config: the game configuration
:return: list of reward rows
"""
rewards = []
for s in game_config.states:
for a1 in game_config.actions:
for a2 in game_config.actions:
r = -game_config.cost_tensor[a1][s]
if r != 0:
rew = f"{s} {a1} {a2} {r}"
rewards.append(rew)
return rewards

@staticmethod
def generate_os_posg_game_file(game_config: IntrusionRecoveryGameConfig) -> str:
"""
Generates the POSG game file for HSVI
:param game_config: the game configuration
:return: a string with the contents of the config file
"""
num_partitions = 1
transitions = IntrusionRecoveryPomdpUtil.generate_transitions(game_config=game_config)
rewards = IntrusionRecoveryPomdpUtil.generate_rewards(game_config=game_config)
game_description = f"{len(game_config.states)} {num_partitions} {len(game_config.actions)} " \
f"{len(game_config.actions)} " \
f"{len(game_config.observations)} {len(transitions)} " \
f"{len(rewards)} {game_config.discount_factor}"
state_desriptions = []
for s in game_config.states:
state_desriptions.append(f"{s} {0}")
player_1_actions = ["WAIT", "RECOVER"]
player_2_actions = ["FALSEALARM", "ATTACK"]

player_2_legal_actions = []
for _ in game_config.states:
player_2_legal_actions.append(" ".join(list(map(lambda x: str(x), game_config.actions))))

player_1_legal_actions = []
player_1_legal_actions.append(" ".join(list(map(lambda x: str(x), game_config.actions))))

obs_desriptions = []
for i, o in enumerate(game_config.observations):
obs_desriptions.append(f"o_{o}")

initial_belief_str = f"{0} {' '.join(list(map(lambda x: str(x), game_config.b1)))}"
game_file_str = ""
game_file_str = game_file_str + game_description + "\n"
game_file_str = game_file_str + "\n".join(state_desriptions) + "\n"
game_file_str = game_file_str + "\n".join(player_1_actions) + "\n"
game_file_str = game_file_str + "\n".join(player_2_actions) + "\n"
game_file_str = game_file_str + "\n".join(obs_desriptions) + "\n"
game_file_str = game_file_str + "\n".join(player_2_legal_actions) + "\n"
game_file_str = game_file_str + "\n".join(player_1_legal_actions) + "\n"
game_file_str = game_file_str + "\n".join(transitions) + "\n"
game_file_str = game_file_str + "\n".join(rewards) + "\n"
game_file_str = game_file_str + initial_belief_str
with open('recovery_game.txt', 'w') as f:
f.write(game_file_str)
return game_file_str

0 comments on commit f858728

Please sign in to comment.