Skip to content

Commit

Permalink
GCM auto assignment now returns a summary object
Browse files Browse the repository at this point in the history
The summary object contains information about the evaluated models and model choices. This object is printable to provide quick summary.

Signed-off-by: Patrick Bloebaum <bloebp@amazon.com>
  • Loading branch information
bloebp committed Nov 2, 2023
1 parent 86fcb28 commit d83fc3f
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 37 deletions.
157 changes: 129 additions & 28 deletions dowhy/gcm/auto.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import warnings
from enum import Enum, auto
from functools import partial
from typing import Callable, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import networkx as nx
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
Expand Down Expand Up @@ -53,7 +54,7 @@
from dowhy.graph import get_ordered_predecessors, is_root_node

_LIST_OF_POTENTIAL_CLASSIFIERS_GOOD = [
partial(create_logistic_regression_classifier, max_iter=1000),
partial(create_logistic_regression_classifier, max_iter=10000),
create_hist_gradient_boost_classifier,
]
_LIST_OF_POTENTIAL_REGRESSORS_GOOD = [
Expand All @@ -71,7 +72,6 @@
]
_LIST_OF_POTENTIAL_REGRESSORS_BETTER = _LIST_OF_POTENTIAL_REGRESSORS_GOOD + [
create_ridge_regressor,
create_polynom_regressor,
partial(create_lasso_regressor, max_iter=5000),
create_random_forest_regressor,
create_support_vector_regressor,
Expand All @@ -87,12 +87,56 @@ class AssignmentQuality(Enum):
BEST = auto()


class AutoAssignmentSummary:
"""Summary class for logging and storing information of the auto assignment process."""

def __init__(self):
self._nodes: Dict[Dict[Any, Any]] = {}

def _init_node_entry(self, node: Any):
if node not in self._nodes:
self._nodes[node] = {"messages": [], "model_performances": []}

def add_node_log_message(self, node: Any, message: str):
self._init_node_entry(node)

self._nodes[node]["messages"].append(message)

def add_model_performance(self, node, model: str, performance: str, metric_name: str):
self._nodes[node]["model_performances"].append((model, performance, metric_name))

def __str__(self):
summary_strings = []

summary_strings.append("Analyzed %d nodes." % len(list(self._nodes)))

for node in self._nodes:
summary_strings.append("--- Node: %s" % node)
summary_strings.extend(self._nodes[node]["messages"])

if len(self._nodes[node]["model_performances"]) > 0:
summary_strings.append(
"For the model selection, the following models were evaluated on the %s metric:"
% self._nodes[node]["model_performances"][0][2]
)

for (model, performance, metric_name) in self._nodes[node]["model_performances"]:
summary_strings.append("%s: %s" % (str(model()).replace("()", ""), str(performance)))

summary_strings.append(
"Based on the type of causal mechanism, the model with the lowest metric value "
"represents the best choice."
)

return "\n".join(summary_strings)


def assign_causal_mechanisms(
causal_model: ProbabilisticCausalModel,
based_on: pd.DataFrame,
quality: AssignmentQuality = AssignmentQuality.GOOD,
override_models: bool = False,
) -> None:
) -> AutoAssignmentSummary:
"""Automatically assigns appropriate causal models. If causal models are already assigned to nodes and
override_models is set to False, this function only validates the assignments with respect to the graph structure.
Here, the validation checks whether root nodes have StochasticModels and non-root ConditionalStochasticModels
Expand Down Expand Up @@ -124,50 +168,93 @@ def assign_causal_mechanisms(
Model training speed: Slow
Model inference speed: Slow-Medium
Model accuracy: Best
:param override_models: If set to True, existing model assignments are replaced with automatically selected
ones. If set to False, the assigned models are only validated with respect to the graph structure.
:return: None
:param override_models: If set to True, existing model assignments are replaced with automatically selected
ones. If set to False, the assigned models are only validated with respect to the graph
structure.
:return: A summary object containing details about the model selection process.
"""
for node in causal_model.graph.nodes:
auto_assignment_summary = AutoAssignmentSummary()

for node in nx.topological_sort(causal_model.graph):
if not override_models and CAUSAL_MECHANISM in causal_model.graph.nodes[node]:
auto_assignment_summary.add_node_log_message(
node,
"Node %s already has a model assigned and the override parameter is False. Skipping this node." % node,
)
validate_causal_model_assignment(causal_model.graph, node)
continue
assign_causal_mechanism_node(causal_model, node, based_on, quality)

model_performances = assign_causal_mechanism_node(causal_model, node, based_on, quality)

if is_root_node(causal_model.graph, node):
auto_assignment_summary.add_node_log_message(
node,
"Node %s is a root node. Assigning '%s' to the node representing the marginal distribution."
% (node, causal_model.causal_mechanism(node)),
)
else:
auto_assignment_summary.add_node_log_message(
node,
"Node %s is a non-root node. Assigning '%s' to the node." % (node, causal_model.causal_mechanism(node)),
)

if isinstance(causal_model.causal_mechanism(node), AdditiveNoiseModel):
auto_assignment_summary.add_node_log_message(
node,
"This represents the causal relationship as "
+ str(node)
+ " := f("
+ ",".join([str(parent) for parent in get_ordered_predecessors(causal_model.graph, node)])
+ ") + N.",
)
elif isinstance(causal_model.causal_mechanism(node), ClassifierFCM):
auto_assignment_summary.add_node_log_message(
node,
"This represents the causal relationship as "
+ str(node)
+ " := f("
+ ",".join([str(parent) for parent in get_ordered_predecessors(causal_model.graph, node)])
+ ",N).",
)

for (model, performance, metric_name) in model_performances:
auto_assignment_summary.add_model_performance(node, model, performance, metric_name)

return auto_assignment_summary


def assign_causal_mechanism_node(
causal_model: ProbabilisticCausalModel,
node: str,
based_on: pd.DataFrame,
quality: AssignmentQuality = AssignmentQuality.GOOD,
) -> None:
causal_model: ProbabilisticCausalModel, node: str, based_on: pd.DataFrame, quality: AssignmentQuality
) -> List[Tuple[Callable[[], PredictionModel], float, str]]:
if is_root_node(causal_model.graph, node):
causal_model.set_causal_mechanism(node, EmpiricalDistribution())
model_performances = []
else:
prediction_model = select_model(
best_model, model_performances = select_model(
based_on[get_ordered_predecessors(causal_model.graph, node)].to_numpy(),
based_on[node].to_numpy(),
quality,
)

if isinstance(prediction_model, ClassificationModel):
causal_model.set_causal_mechanism(node, ClassifierFCM(prediction_model))
if isinstance(best_model, ClassificationModel):
causal_model.set_causal_mechanism(node, ClassifierFCM(best_model))
else:
causal_model.set_causal_mechanism(node, AdditiveNoiseModel(prediction_model))
causal_model.set_causal_mechanism(node, AdditiveNoiseModel(best_model))

return model_performances


def select_model(
X: np.ndarray, Y: np.ndarray, model_selection_quality: AssignmentQuality
) -> Union[PredictionModel, ClassificationModel]:
) -> Tuple[Union[PredictionModel, ClassificationModel], List[Tuple[Callable[[], PredictionModel], float, str]]]:
if model_selection_quality == AssignmentQuality.BEST:
try:
from dowhy.gcm.ml.autogluon import AutoGluonClassifier, AutoGluonRegressor

if is_categorical(Y):
return AutoGluonClassifier()
return AutoGluonClassifier(), []
else:
return AutoGluonRegressor()
return AutoGluonRegressor(), []
except ImportError:
raise RuntimeError(
"AutoGluon module not found! For the BEST auto assign quality, consider installing the "
Expand All @@ -187,12 +274,18 @@ def select_model(
if auto_apply_encoders(X, auto_fit_encoders(X)).shape[1] <= 5:
# Avoid too many features
list_of_regressor += [create_polynom_regressor]
list_of_classifier += [partial(create_polynom_logistic_regression_classifier, max_iter=1000)]
list_of_classifier += [partial(create_polynom_logistic_regression_classifier, max_iter=10000)]

if is_categorical(Y):
return find_best_model(list_of_classifier, X, Y, model_selection_splits=model_selection_splits)()
best_model, model_performances = find_best_model(
list_of_classifier, X, Y, model_selection_splits=model_selection_splits
)
return best_model(), model_performances
else:
return find_best_model(list_of_regressor, X, Y, model_selection_splits=model_selection_splits)()
best_model, model_performances = find_best_model(
list_of_regressor, X, Y, model_selection_splits=model_selection_splits
)
return best_model(), model_performances


def has_linear_relationship(X: np.ndarray, Y: np.ndarray, max_num_samples: int = 3000) -> bool:
Expand Down Expand Up @@ -253,27 +346,31 @@ def find_best_model(
max_samples_per_split: int = 10000,
model_selection_splits: int = 5,
n_jobs: Optional[int] = None,
) -> Callable[[], PredictionModel]:
) -> Tuple[Callable[[], PredictionModel], List[Tuple[Callable[[], PredictionModel], float, str]]]:
n_jobs = config.default_n_jobs if n_jobs is None else n_jobs

X, Y = shape_into_2d(X, Y)

is_classification_problem = isinstance(prediction_model_factories[0](), ClassificationModel)

metric_name = "given"

if metric is None:
metric_name = "(negative) F1"
if is_classification_problem:
metric = lambda y_true, y_preds: -metrics.f1_score(
y_true, y_preds, average="macro", zero_division=0
) # Higher score is better
else:
metric_name = "mean squared error (MSE)"
metric = metrics.mean_squared_error

labelBinarizer = None
if is_classification_problem:
labelBinarizer = MultiLabelBinarizer()
labelBinarizer.fit(Y)

kfolds = list(KFold(n_splits=model_selection_splits).split(range(X.shape[0])))
kfolds = list(KFold(n_splits=model_selection_splits, shuffle=True).split(range(X.shape[0])))

def estimate_average_score(prediction_model_factory: Callable[[], PredictionModel], random_seed: int) -> float:
set_random_seed(random_seed)
Expand Down Expand Up @@ -301,5 +398,9 @@ def estimate_average_score(prediction_model_factory: Callable[[], PredictionMode
delayed(estimate_average_score)(prediction_model_factory, int(random_seed))
for prediction_model_factory, random_seed in zip(prediction_model_factories, random_seeds)
)
sorted_results = sorted(
zip(prediction_model_factories, average_metric_scores, [metric_name] * len(prediction_model_factories)),
key=lambda x: x[1],
)

return sorted(zip(prediction_model_factories, average_metric_scores), key=lambda x: x[1])[0][0]
return sorted_results[0][0], sorted_results
20 changes: 18 additions & 2 deletions dowhy/gcm/causal_mechanisms.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numpy as np

from dowhy.gcm.ml import ClassificationModel, PredictionModel
from dowhy.gcm.ml.regression import InvertibleFunction
from dowhy.gcm.ml.regression import InvertibleFunction, SklearnRegressionModel
from dowhy.gcm.util.general import is_categorical, shape_into_2d


Expand Down Expand Up @@ -155,9 +155,14 @@ def evaluate(self, parent_samples: np.ndarray, noise_samples: np.ndarray) -> np.
return self._invertible_function.evaluate(predictions + noise_samples)

def __str__(self) -> str:
if isinstance(self._prediction_model, SklearnRegressionModel):
prediction_model_string = self._prediction_model.sklearn_model.__class__.__name__
else:
prediction_model_string = self._prediction_model.__class__.__name__

return "%s with %s and an %s" % (
self.__class__.__name__,
self._prediction_model.__class__.__name__,
prediction_model_string,
self._invertible_function.__class__.__name__,
)

Expand Down Expand Up @@ -207,6 +212,14 @@ def __init__(self, prediction_model: PredictionModel, noise_model: Optional[Stoc
def clone(self):
return AdditiveNoiseModel(prediction_model=self.prediction_model.clone(), noise_model=self.noise_model.clone())

def __str__(self) -> str:
if isinstance(self._prediction_model, SklearnRegressionModel):
prediction_model_string = self._prediction_model.sklearn_model.__class__.__name__
else:
prediction_model_string = self._prediction_model.__class__.__name__

return "AdditiveNoiseModel using %s" % prediction_model_string


class ProbabilityEstimatorModel(ABC):
@abstractmethod
Expand Down Expand Up @@ -291,3 +304,6 @@ def get_class_names(self, class_indices: np.ndarray) -> List[str]:
@property
def classifier_model(self) -> ClassificationModel:
return self._classifier_model

def __repr__(self):
return "Classifier FCM based on %s" % self.classifier_model
6 changes: 1 addition & 5 deletions dowhy/gcm/independence_test/generalised_cov_measure.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,4 @@ def _create_model(
if not isinstance(model, AssignmentQuality):
return model()
else:
return select_model(
input_features,
target,
model,
)
return select_model(input_features, target, model)[0]
2 changes: 1 addition & 1 deletion dowhy/gcm/influence.py
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ def _get_icc_noise_function(
return prediction_model.predict

if prediction_model == "approx":
prediction_model = auto.select_model(noise_samples, target_samples, auto_assign_quality)
prediction_model = auto.select_model(noise_samples, target_samples, auto_assign_quality)[0]
prediction_model.fit(noise_samples, target_samples)

if target_is_categorical:
Expand Down
3 changes: 3 additions & 0 deletions dowhy/gcm/ml/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def clone(self):
"""
return SklearnRegressionModel(sklearn_mdl=sklearn.clone(self._sklearn_mdl))

def __str__(self):
return str(self._sklearn_mdl)


def create_linear_regressor_with_given_parameters(
coefficients: np.ndarray, intercept: float = 0, **kwargs
Expand Down
8 changes: 7 additions & 1 deletion dowhy/gcm/stochastic_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ def map_scipy_distribution_parameters_to_names(

return parameters_dictionary

def __str__(self) -> str:
return str(self._distribution.name) + " distribution"


class EmpiricalDistribution(StochasticModel):
"""An implementation of a stochastic model that uniformly samples from data samples. By randomly returning a sample
Expand All @@ -202,6 +205,9 @@ def draw_samples(self, num_samples: int) -> np.ndarray:
def clone(self):
return EmpiricalDistribution()

def __str__(self):
return "Empirical Distribution"


class BayesianGaussianMixtureDistribution(StochasticModel):
def __init__(self) -> None:
Expand Down Expand Up @@ -247,7 +253,7 @@ def draw_samples(self, num_samples: int) -> np.ndarray:
return shape_into_2d(self._gmm_model.sample(num_samples)[0])

def __str__(self) -> str:
return "Approximated data distribution"
return "Gaussian Mixture Distribution"

def clone(self):
return BayesianGaussianMixtureDistribution()
Loading

0 comments on commit d83fc3f

Please sign in to comment.