From 1b3241e6db03ce8f6991a04ce82661e1f41cf57c Mon Sep 17 00:00:00 2001 From: Danilo Lessa Bernardineli Date: Thu, 14 Dec 2023 20:32:55 -0300 Subject: [PATCH] Fix single_proc + add support for single_proc for multi-runs & sweeps (#315) * fix tests + rm simulations/ folder * add types.py * single run / multi mc is ok * fix for single run / single param * add support for single proc runs --------- Co-authored-by: Emanuel Lima --- cadCAD/configuration/utils/__init__.py | 52 +++--- cadCAD/engine/__init__.py | 77 ++++++--- cadCAD/engine/execution.py | 91 ++++++----- cadCAD/types.py | 31 +++- cadCAD/utils/__init__.py | 7 +- testing/test_runs.py | 210 +++++++++++++++++++++++++ 6 files changed, 377 insertions(+), 91 deletions(-) create mode 100644 testing/test_runs.py diff --git a/cadCAD/configuration/utils/__init__.py b/cadCAD/configuration/utils/__init__.py index 5063a007..fce7bcb7 100644 --- a/cadCAD/configuration/utils/__init__.py +++ b/cadCAD/configuration/utils/__init__.py @@ -1,10 +1,11 @@ -import pandas as pd -import numpy as np +import pandas as pd # type: ignore from datetime import datetime, timedelta from collections import Counter from copy import deepcopy from functools import reduce -from funcy import curry +from funcy import curry # type: ignore +from cadCAD.types import * +from typing import Union, Dict, List from cadCAD.configuration.utils.depreciationHandler import sanitize_partial_state_updates from cadCAD.utils import dict_filter, contains_type, flatten_tabulated_dict, tabulate_dict @@ -161,27 +162,40 @@ def env_update(state_dict, sweep_dict, target_value): curry(trigger)(end_substep)(trigger_field)(trigger_vals)(funct_list) -def config_sim(d): - def process_variables(d): - return flatten_tabulated_dict(tabulate_dict(d)) +def config_sim(config_dict: ConfigurationDict): - if "N" in d: - if d["N"] <= 0: + if "N" in config_dict: + if config_dict["N"] <= 0: raise ValueError("'N' must be > 0") + else: + pass else: - raise KeyError("The 'sim_configs' dictionary must contain the key 'N'") - - if "T" not in d: - raise KeyError("The 'sim_configs' dictionary must contain the key 'T'") + raise KeyError("The 'sim_configs' dictionary must contain the key 'N' (# of Monte Carlo Runs)") - if "M" in d: - M_lengths = len(list(set({key: len(value) for key, value in d["M"].items()}.values()))) - if M_lengths > 2: - raise Exception('`M` values require up to a maximum of 2 distinct lengths') - return [{"N": d["N"], "T": d["T"], "M": M} for M in process_variables(d["M"])] + if "T" not in config_dict: + raise KeyError("The 'sim_configs' dictionary must contain the key 'T' (Timestep Iterator)") else: - d["M"] = [{}] - return d + if "M" in config_dict: + params = config_dict['M'] + + param_values_length = {key: len(value) if type(value) == list else 0 + for key, value in params.items()} + param_values_length_set = set(param_values_length.values()) + distinct_param_value_lengths = len(param_values_length_set) + + if distinct_param_value_lengths > 2: + raise Exception('When sweeping, `M` list lengths should either be 1 and/or equal. More than two distinct lengths are not allowed') + elif (distinct_param_value_lengths == 1) and (0 in param_values_length_set): + return config_dict + elif (1 in param_values_length_set): + return [{**config_dict, "M": M} + for M in flatten_tabulated_dict(tabulate_dict(params))] + else: + raise Exception('When sweeping, `M` list lengths should either be 1 and/or equal. ') + + else: + config_dict["M"] = [{}] + return config_dict def psub_list(psu_block, psu_steps): diff --git a/cadCAD/engine/__init__.py b/cadCAD/engine/__init__.py index 57d84bc7..c154e72f 100644 --- a/cadCAD/engine/__init__.py +++ b/cadCAD/engine/__init__.py @@ -7,6 +7,7 @@ from cadCAD.configuration.utils import TensorFieldReport, configs_as_objs, configs_as_dicts from cadCAD.engine.simulation import Executor as SimExecutor from cadCAD.engine.execution import single_proc_exec, parallelize_simulations, local_simulations +from cadCAD.types import * VarDictType = Dict[str, List[Any]] StatesListsType = List[Dict[str, Any]] @@ -24,6 +25,17 @@ class ExecutionMode: multi_proc = 'multi_proc' +def auto_mode_switcher(config_amt: int): + try: + if config_amt == 1: + return ExecutionMode.single_mode, single_proc_exec + elif (config_amt > 1): + return ExecutionMode.multi_mode, parallelize_simulations + except AttributeError: + if config_amt < 1: + raise ValueError('N must be >= 1!') + + class ExecutionContext: def __init__(self, context=ExecutionMode.local_mode, method=None, additional_objs=None) -> None: self.name = context @@ -39,7 +51,7 @@ def distroduce_proc( ExpIDs, SubsetIDs, SubsetWindows, - configured_n, # exec_method, + configured_n, # exec_method, sc, additional_objs=additional_objs ): return method( @@ -47,7 +59,7 @@ def distroduce_proc( ExpIDs, SubsetIDs, SubsetWindows, - configured_n, # exec_method, + configured_n, # exec_method, sc, additional_objs ) @@ -56,8 +68,8 @@ def distroduce_proc( class Executor: def __init__(self, - exec_context: ExecutionContext, configs: List[Configuration], sc=None, empty_return=False - ) -> None: + exec_context: ExecutionContext, configs: List[Configuration], sc=None, empty_return=False + ) -> None: self.sc = sc self.SimExecutor = SimExecutor self.exec_method = exec_context.method @@ -70,7 +82,8 @@ def execute(self) -> Tuple[Any, Any, Dict[str, Any]]: return [], [], [] config_proc = Processor() - create_tensor_field = TensorFieldReport(config_proc).create_tensor_field + create_tensor_field = TensorFieldReport( + config_proc).create_tensor_field sessions = [] var_dict_list, states_lists = [], [] @@ -105,18 +118,30 @@ def execute(self) -> Tuple[Any, Any, Dict[str, Any]]: var_dict_list.append(x.sim_config['M']) states_lists.append([x.initial_state]) eps.append(list(x.exogenous_states.values())) - configs_structs.append(config_proc.generate_config(x.initial_state, x.partial_state_update_blocks, eps[config_idx])) + configs_structs.append(config_proc.generate_config( + x.initial_state, x.partial_state_update_blocks, eps[config_idx])) env_processes_list.append(x.env_processes) partial_state_updates.append(x.partial_state_update_blocks) sim_executors.append(SimExecutor(x.policy_ops).simulation) config_idx += 1 - def get_final_dist_results(simulations, psus, eps, sessions): - tensor_fields = [create_tensor_field(psu, ep) for psu, ep in list(zip(psus, eps))] + remote_threshold = 100 + config_amt = len(self.configs) + + def get_final_dist_results(simulations: List[StateHistory], + psus: List[StateUpdateBlocks], + eps, + sessions: List[SessionDict]): + tensor_fields = [create_tensor_field( + psu, ep) for psu, ep in list(zip(psus, eps))] return simulations, tensor_fields, sessions - def get_final_results(simulations, psus, eps, sessions, remote_threshold): + def get_final_results(simulations: List[StateHistory], + psus: List[StateUpdateBlocks], + eps, + sessions: List[SessionDict], + remote_threshold: int): flat_timesteps, tensor_fields = [], [] for sim_result, psu, ep in list(zip(simulations, psus, eps)): flat_timesteps.append(flatten(sim_result)) @@ -128,25 +153,23 @@ def get_final_results(simulations, psus, eps, sessions, remote_threshold): elif config_amt > 1: return flat_simulations, tensor_fields, sessions - remote_threshold = 100 - config_amt = len(self.configs) - - def auto_mode_switcher(config_amt): - try: - if config_amt == 1: - return ExecutionMode.single_mode, single_proc_exec - elif (config_amt > 1): - return ExecutionMode.multi_mode, parallelize_simulations - except AttributeError: - if config_amt < 1: - raise ValueError('N must be >= 1!') - final_result = None original_N = len(configs_as_dicts(self.configs)) if self.exec_context != ExecutionMode.distributed: # Consider Legacy Support - if self.exec_context != ExecutionMode.local_mode: - self.exec_context, self.exec_method = auto_mode_switcher(config_amt) + if self.exec_context == ExecutionMode.local_mode: + self.exec_context, self.exec_method = auto_mode_switcher( + config_amt) + elif self.exec_context == ExecutionMode.single_mode or self.exec_context == ExecutionMode.single_proc: + self.exec_context, self.exec_method = ExecutionMode.single_mode, single_proc_exec + elif self.exec_context == ExecutionMode.multi_mode or self.exec_context == ExecutionMode.multi_proc: + if config_amt == 1: + raise ValueError("Multi mode must have at least 2 configs") + else: + self.exec_context, self.exec_method = ExecutionMode.multi_mode, parallelize_simulations + else: + raise ValueError("Invalid execution mode specified") + print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( @@ -154,14 +177,16 @@ def auto_mode_switcher(config_amt): ExpIDs, SubsetIDs, SubsetWindows, original_N ) - final_result = get_final_results(simulations_results, partial_state_updates, eps, sessions, remote_threshold) + final_result = get_final_results( + simulations_results, partial_state_updates, eps, sessions, remote_threshold) elif self.exec_context == ExecutionMode.distributed: print("Execution Method: " + self.exec_method.__name__) simulations_results = self.exec_method( sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs, ExpIDs, SubsetIDs, SubsetWindows, original_N, self.sc ) - final_result = get_final_dist_results(simulations_results, partial_state_updates, eps, sessions) + final_result = get_final_dist_results( + simulations_results, partial_state_updates, eps, sessions) t2 = time() print(f"Total execution time: {t2 - t1 :.2f}s") diff --git a/cadCAD/engine/execution.py b/cadCAD/engine/execution.py index 41e86e87..fdf0452c 100644 --- a/cadCAD/engine/execution.py +++ b/cadCAD/engine/execution.py @@ -1,7 +1,7 @@ from typing import Callable, Dict, List, Any, Tuple -from pathos.multiprocessing import ProcessPool as PPool +from pathos.multiprocessing import ProcessPool as PPool # type: ignore from collections import Counter - +from cadCAD.types import * from cadCAD.utils import flatten VarDictType = Dict[str, List[Any]] @@ -11,26 +11,31 @@ def single_proc_exec( - simulation_execs: List[Callable], - var_dict_list: List[VarDictType], - states_lists: List[StatesListsType], - configs_structs: List[ConfigsType], - env_processes_list: List[EnvProcessesType], - Ts: List[range], - SimIDs, - Ns: List[int], + simulation_execs: List[ExecutorFunction], + var_dict_list: List[Parameters], + states_lists: List[StateHistory], + configs_structs: List[StateUpdateBlocks], + env_processes_list: List[EnvProcesses], + Ts: List[TimeSeq], + SimIDs: List[SimulationID], + Ns: List[Run], ExpIDs: List[int], - SubsetIDs, - SubsetWindows, - configured_n + SubsetIDs: List[SubsetID], + SubsetWindows: List[SubsetWindow], + configured_n: List[N_Runs] ): + + # HACK for making it run with N_Runs=1 + if type(var_dict_list) == list: + var_dict_list = var_dict_list[0] + print(f'Execution Mode: single_threaded') - params = [ + raw_params: List[List] = [ simulation_execs, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows ] simulation_exec, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window = list( - map(lambda x: x.pop(), params) + map(lambda x: x.pop(), raw_params) ) result = simulation_exec( var_dict_list, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n @@ -38,19 +43,22 @@ def single_proc_exec( return flatten(result) + + + def parallelize_simulations( - simulation_execs: List[Callable], - var_dict_list: List[VarDictType], - states_lists: List[StatesListsType], - configs_structs: List[ConfigsType], - env_processes_list: List[EnvProcessesType], - Ts: List[range], - SimIDs, - Ns: List[int], + simulation_execs: List[ExecutorFunction], + var_dict_list: List[Parameters], + states_lists: List[StateHistory], + configs_structs: List[StateUpdateBlocks], + env_processes_list: List[EnvProcesses], + Ts: List[TimeSeq], + SimIDs: List[SimulationID], + Ns: List[Run], ExpIDs: List[int], - SubsetIDs, - SubsetWindows, - configured_n + SubsetIDs: List[SubsetID], + SubsetWindows: List[SubsetWindow], + configured_n: List[N_Runs] ): print(f'Execution Mode: parallelized') @@ -104,32 +112,29 @@ def process_executor(params): def local_simulations( - simulation_execs: List[Callable], - var_dict_list: List[VarDictType], - states_lists: List[StatesListsType], - configs_structs: List[ConfigsType], - env_processes_list: List[EnvProcessesType], - Ts: List[range], - SimIDs, - Ns: List[int], - ExpIDs: List[int], - SubsetIDs, - SubsetWindows, - configured_n + simulation_execs: List[ExecutorFunction], + var_dict_list: List[Parameters], + states_lists: List[StateHistory], + configs_structs: List[StateUpdateBlocks], + env_processes_list: List[EnvProcesses], + Ts: List[TimeSeq], + SimIDs: List[SimulationID], + Ns: List[Run], + ExpIDs: List[int], + SubsetIDs: List[SubsetID], + SubsetWindows: List[SubsetWindow], + configured_n: List[N_Runs] ): config_amt = len(configs_structs) - _params = None if config_amt == 1: # and configured_n != 1 - _params = var_dict_list[0] return single_proc_exec( - simulation_execs, _params, states_lists, configs_structs, env_processes_list, + simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n ) elif config_amt > 1: # and configured_n != 1 - _params = var_dict_list return parallelize_simulations( - simulation_execs, _params, states_lists, configs_structs, env_processes_list, + simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n ) # elif config_amt > 1 and configured_n == 1: diff --git a/cadCAD/types.py b/cadCAD/types.py index 5b47964e..5e8eb274 100644 --- a/cadCAD/types.py +++ b/cadCAD/types.py @@ -1,7 +1,9 @@ -from typing import TypedDict, Callable, Union, Dict, List, Tuple +from typing import TypedDict, Callable, Union, Dict, List, Tuple, Iterator +from collections import deque State = Dict[str, object] Parameters = Dict[str, object] +SweepableParameters = Dict[str, list[object]] Substep = int StateHistory = List[List[State]] PolicyOutput = Dict[str, object] @@ -16,3 +18,30 @@ class StateUpdateBlock(TypedDict): StateUpdateBlocks = List[StateUpdateBlock] + +class ConfigurationDict(TypedDict): + T: Iterator # Generator for the timestep variable + N: int # Number of MC Runs + M: Union[Parameters, SweepableParameters] # Parameters / List of Parameter to Sweep + + +EnvProcesses = object +TimeSeq = Iterator +SimulationID = int +Run = int +SubsetID = int +SubsetWindow = Iterator +N_Runs = int + +ExecutorFunction = Callable[[Parameters, StateHistory, StateUpdateBlocks, EnvProcesses, TimeSeq, SimulationID, Run, SubsetID, SubsetWindow, N_Runs], object] +ExecutionParameter = Tuple[ExecutorFunction, Parameters, StateHistory, StateUpdateBlocks, EnvProcesses, TimeSeq, SimulationID, Run, SubsetID, SubsetWindow, N_Runs] + + +class SessionDict(TypedDict): + user_id: str + experiment_id: int + session_id: str + simulation_id: int + run_id: int + subset_id: int + subset_window: deque diff --git a/cadCAD/utils/__init__.py b/cadCAD/utils/__init__.py index 2c75a57b..cb7977a0 100644 --- a/cadCAD/utils/__init__.py +++ b/cadCAD/utils/__init__.py @@ -3,11 +3,14 @@ from collections import defaultdict from itertools import product import warnings +from typing import Union +from cadCAD.types import * +from typing import List, Dict, Union import functools import operator -from pandas import DataFrame +from pandas import DataFrame # type: ignore class SilentDF(DataFrame): @@ -99,7 +102,7 @@ def tabulate_dict(d: Dict[str, List[int]]) -> Dict[str, List[int]]: def flatten_tabulated_dict(d: Dict[str, List[int]]) -> List[Dict[str, int]]: max_len = get_max_dict_val_len(d) - dl = [{} for i in range(max_len)] + dl: list[dict] = [{} for i in range(max_len)] for k, vl in d.items(): for v, i in zip(vl, list(range(len(vl)))): diff --git a/testing/test_runs.py b/testing/test_runs.py new file mode 100644 index 00000000..2c33c2b4 --- /dev/null +++ b/testing/test_runs.py @@ -0,0 +1,210 @@ +from typing import Dict, List +from cadCAD.engine import Executor, ExecutionContext, ExecutionMode +from cadCAD.configuration import Experiment +from cadCAD.configuration.utils import env_trigger, var_substep_trigger, config_sim, psub_list +from cadCAD.types import * +import pandas as pd # type: ignore +import types +import inspect +import pytest + +def describe_or_return(v: object) -> object: + """ + Thanks @LinuxIsCool! + """ + if isinstance(v, types.FunctionType): + return f'function: {v.__name__}' + elif isinstance(v, types.LambdaType) and v.__name__ == '': + return f'lambda: {inspect.signature(v)}' + else: + return v + + +def select_M_dict(M_dict: Dict[str, object], keys: set) -> Dict[str, object]: + """ + Thanks @LinuxIsCool! + """ + return {k: describe_or_return(v) for k, v in M_dict.items() if k in keys} + + +def select_config_M_dict(configs: list, i: int, keys: set) -> Dict[str, object]: + return select_M_dict(configs[i].sim_config['M'], keys) + + +def drop_substeps(_df): + first_ind = (_df.substep == 0) & (_df.timestep == 0) + last_ind = _df.substep == max(_df.substep) + inds_to_drop = first_ind | last_ind + return _df.copy().loc[inds_to_drop].drop(columns=['substep']) + + +def assign_params(_df: pd.DataFrame, configs) -> pd.DataFrame: + """ + Based on `cadCAD-tools` package codebase, by @danlessa + """ + M_dict = configs[0].sim_config['M'] + params_set = set(M_dict.keys()) + selected_params = params_set + + # Attribute parameters to each row + # 1. Assign the parameter set from the first row first, so that + # columns are created + first_param_dict = select_config_M_dict(configs, 0, selected_params) + + # 2. Attribute parameter on an (simulation, subset, run) basis + df = _df.assign(**first_param_dict).copy() + for i, (_, subset_df) in enumerate(df.groupby(['simulation', 'subset', 'run'])): + df.loc[subset_df.index] = subset_df.assign(**select_config_M_dict(configs, + i, + selected_params)) + return df + + + + +SWEEP_PARAMS: Dict[str, List] = { + 'alpha': [1], + 'beta': [lambda x: 2 * x, lambda x: x], + 'gamma': [3, 4], + 'omega': [7] + } + +SINGLE_PARAMS: Dict[str, object] = { + 'alpha': 1, + 'beta': lambda x: x, + 'gamma': 3, + 'omega': 5 + } + + +def create_experiment(N_RUNS=2, N_TIMESTEPS=3, params: dict=SWEEP_PARAMS): + psu_steps = ['m1', 'm2', 'm3'] + system_substeps = len(psu_steps) + var_timestep_trigger = var_substep_trigger([0, system_substeps]) + env_timestep_trigger = env_trigger(system_substeps) + env_process = {} + + + # ['s1', 's2', 's3', 's4'] + # Policies per Mechanism + def gamma(params: Parameters, substep: Substep, history: StateHistory, state: State, **kwargs): + return {'gamma': params['gamma']} + + + def omega(params: Parameters, substep: Substep, history: StateHistory, state: State, **kwarg): + return {'omega': params['omega']} + + + # Internal States per Mechanism + def alpha(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): + return 'alpha_var', params['alpha'] + + + def beta(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): + return 'beta_var', params['beta'] + + def gamma_var(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): + return 'gamma_var', params['gamma'] + + def omega_var(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): + return 'omega_var', params['omega'] + + + def policies(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): + return 'policies', _input + + + def sweeped(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs): + return 'sweeped', {'beta': params['beta'], 'gamma': params['gamma']} + + psu_block: dict = {k: {"policies": {}, "states": {}} for k in psu_steps} + for m in psu_steps: + psu_block[m]['policies']['gamma'] = gamma + psu_block[m]['policies']['omega'] = omega + psu_block[m]["states"]['alpha_var'] = alpha + psu_block[m]["states"]['beta_var'] = beta + psu_block[m]["states"]['gamma_var'] = gamma_var + psu_block[m]["states"]['omega_var'] = omega_var + psu_block[m]['states']['policies'] = policies + psu_block[m]["states"]['sweeped'] = var_timestep_trigger(y='sweeped', f=sweeped) + + + # Genesis States + genesis_states = { + 'alpha_var': 0, + 'beta_var': 0, + 'gamma_var': 0, + 'omega_var': 0, + 'policies': {}, + 'sweeped': {} + } + + # Environment Process + env_process['sweeped'] = env_timestep_trigger(trigger_field='timestep', trigger_vals=[5], funct_list=[lambda _g, x: _g['beta']]) + + sim_config = config_sim( + { + "N": N_RUNS, + "T": range(N_TIMESTEPS), + "M": params, # Optional + } + ) + + # New Convention + partial_state_update_blocks = psub_list(psu_block, psu_steps) + + exp = Experiment() + exp.append_model( + sim_configs=sim_config, + initial_state=genesis_states, + env_processes=env_process, + partial_state_update_blocks=partial_state_update_blocks + ) + return exp + + +def test_mc_sweep_experiment(): + experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.local_mode) + experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.single_mode) + experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.multi_mode) + +def test_unique_sweep_experiment(): + experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.local_mode) + experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.single_mode) + experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SWEEP_PARAMS), ExecutionMode.multi_mode) + +def test_mc_single_experiment(): + experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.local_mode) + experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.single_mode) + experiment_assertions(create_experiment(N_RUNS=2, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.multi_mode) + +def test_unique_single_experiment(): + experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.local_mode) + experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.single_mode) + with pytest.raises(ValueError) as e_info: + experiment_assertions(create_experiment(N_RUNS=1, N_TIMESTEPS=2, params=SINGLE_PARAMS), ExecutionMode.multi_mode) + + + +def experiment_assertions(exp, mode=None): + if mode == None: + mode = ExecutionMode().local_mode + exec_context = ExecutionContext(mode) + executor = Executor(exec_context=exec_context, configs=exp.configs) + (records, tensor_field, _) = executor.execute() + df = drop_substeps(assign_params(pd.DataFrame(records), exp.configs)) + + # XXX: parameters should always be of the same type. Else, the test will fail + first_sim_config = exp.configs[0].sim_config['M'] + + + for (i, row) in df.iterrows(): + if row.timestep > 0: + + assert row['alpha_var'] == row['alpha'] + assert type(row['alpha_var']) == type(first_sim_config['alpha']) + assert row['gamma_var'] == row['gamma'] + assert type(row['gamma_var']) == type(first_sim_config['gamma']) + assert row['omega_var'] == row['omega'] + assert type(row['omega_var']) == type(first_sim_config['omega']) +