From cacc902881bc92bde16d72863d2ede0d51774e5f Mon Sep 17 00:00:00 2001 From: Jeroen Overschie Date: Mon, 7 Jun 2021 18:09:54 +0200 Subject: [PATCH] =?UTF-8?q?Finish=20multiprocessing=20support=20?= =?UTF-8?q?=F0=9F=8E=89=20->=20also,=20move=20caching=20to=20Estimator=20?= =?UTF-8?q?=F0=9F=99=8C=F0=9F=8F=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fseval/conf/pipeline/rank_and_validate.yaml | 1 + fseval/main.py | 10 ++++-- fseval/pipeline/estimator.py | 26 ++++++++++++-- fseval/pipelines/_experiment.py | 35 +++++++++++++++---- .../rank_and_validate/_components.py | 11 ++++-- fseval/pipelines/rank_and_validate/_config.py | 3 ++ .../rank_and_validate/_ranking_validator.py | 27 +++++++------- .../rank_and_validate/_subset_validator.py | 25 +++++++------ fseval/storage_providers/wandb.py | 25 +++++++++---- 9 files changed, 120 insertions(+), 43 deletions(-) diff --git a/fseval/conf/pipeline/rank_and_validate.yaml b/fseval/conf/pipeline/rank_and_validate.yaml index 25ebaac..0900bcd 100644 --- a/fseval/conf/pipeline/rank_and_validate.yaml +++ b/fseval/conf/pipeline/rank_and_validate.yaml @@ -4,4 +4,5 @@ defaults: - /estimator@ranker: relieff - /estimator@validator: decision_tree n_bootstraps: 2 +n_jobs: -1 all_features_to_select: range(1, min(50, p) + 1) diff --git a/fseval/main.py b/fseval/main.py index 8ae5294..27a9abb 100644 --- a/fseval/main.py +++ b/fseval/main.py @@ -80,18 +80,24 @@ def main(cfg: BaseConfig) -> None: X_train, X_test, y_train, y_test = cv.train_test_split(X, y) try: + logger.info(f"pipeline {TerminalColor.cyan('prefit')}...") + pipeline.prefit() + logger.info(f"pipeline {TerminalColor.cyan('fit')}...") pipeline.fit(X_train, y_train) + logger.info(f"pipeline {TerminalColor.cyan('postfit')}...") + pipeline.postfit() + logger.info(f"pipeline {TerminalColor.cyan('score')}...") + scores = pipeline.score(X_test, y_test) except Exception as e: print_exc() logger.error(e) logger.info( - "error occured during pipeline fitting step... " + "error occured during pipeline `prefit`, `fit` or `score` step... " + "exiting with a status code 1." ) callbacks.on_end(exit_code=1) raise e - scores = pipeline.score(X_test, y_test) logger.info(f"{pipeline_name} pipeline finished {TerminalColor.green('✓')}") callbacks.on_summary(scores) callbacks.on_end() diff --git a/fseval/pipeline/estimator.py b/fseval/pipeline/estimator.py index 190dedf..b20f04d 100644 --- a/fseval/pipeline/estimator.py +++ b/fseval/pipeline/estimator.py @@ -3,16 +3,22 @@ from logging import Logger, getLogger from typing import Any, Optional +from fseval.types import ( + AbstractEstimator, + AbstractStorageProvider, + IncompatibilityError, + Task, +) from hydra.utils import instantiate from omegaconf import II, MISSING, OmegaConf from sklearn.preprocessing import minmax_scale -from fseval.types import AbstractEstimator, IncompatibilityError, Task - @dataclass class EstimatorConfig: estimator: Any = None # must have _target_ of type BaseEstimator. + use_cache_if_available: bool = True + # tags multioutput: Optional[bool] = None multioutput_only: Optional[bool] = None requires_positive_X: Optional[bool] = None @@ -30,6 +36,7 @@ class TaskedEstimatorConfig(EstimatorConfig): name: str = MISSING classifier: Optional[EstimatorConfig] = None regressor: Optional[EstimatorConfig] = None + use_cache_if_available: bool = True # tags multioutput: Optional[bool] = False multioutput_only: Optional[bool] = False @@ -51,6 +58,7 @@ class Estimator(AbstractEstimator, EstimatorConfig): task: Task = MISSING logger: Logger = getLogger(__name__) + _is_fitted: bool = False @classmethod def _get_estimator_repr(cls, estimator): @@ -66,13 +74,27 @@ def _get_class_repr(cls, estimator): class_name = type(estimator).__name__ return f"{module_name}.{class_name}" + def _load_cache(self, filename: str, storage_provider: AbstractStorageProvider): + restored = storage_provider.restore_pickle(filename) + self.estimator = restored or self.estimator + self._is_fitted = restored + + def _save_cache(self, filename: str, storage_provider: AbstractStorageProvider): + storage_provider.save_pickle(filename, self.estimator) + def fit(self, X, y): + # don't refit if cache available and `use_cache_if_available` is enabled + if self._is_fitted and self.use_cache_if_available: + return self + + # rescale if necessary if self.requires_positive_X: X = minmax_scale(X) self.logger.info( "rescaled X: this estimator strictly requires positive features." ) + # fit self.logger.debug(f"Fitting {Estimator._get_class_repr(self)}...") self.estimator.fit(X, y) return self diff --git a/fseval/pipelines/_experiment.py b/fseval/pipelines/_experiment.py index b05a62c..24d5135 100644 --- a/fseval/pipelines/_experiment.py +++ b/fseval/pipelines/_experiment.py @@ -14,11 +14,13 @@ class Experiment(AbstractEstimator): estimators: List[AbstractEstimator] = field(default_factory=lambda: []) logger: Logger = getLogger(__name__) - n_jobs: Optional[int] = None def __post_init__(self): self.estimators = list(self._get_estimator()) + def _get_n_jobs(self): + return None + def _get_estimator(self): return [] @@ -50,8 +52,17 @@ def _step_text(self, step_name, step_number, estimator): ) def _prepare_data(self, X, y): + """Callback. Can be used to implement any data preparation schemes.""" return X, y + def prefit(self): + """Pre-fit hook. Is executed right before calling `fit()`. Can be used to load + estimators from cache or do any other preparatory work.""" + + for estimator in self.estimators: + if hasattr(estimator, "prefit") and callable(getattr(estimator, "prefit")): + estimator.prefit() + def _fit_estimator(self, X, y, step_number, estimator): logger = self._logger(estimator) text = self._step_text("fit", step_number, estimator) @@ -75,13 +86,13 @@ def fit(self, X, y) -> AbstractEstimator: X, y = self._prepare_data(X, y) - if self.n_jobs is not None: - assert ( - self.n_jobs >= 1 or self.n_jobs == -1 - ), f"incorrect `n_jobs`: {self.n_jobs}" + ## Run `fit` + n_jobs = self._get_n_jobs() + if n_jobs is not None: + assert n_jobs >= 1 or n_jobs == -1, f"incorrect `n_jobs`: {n_jobs}" - cpus = multiprocessing.cpu_count() if self.n_jobs == -1 else self.n_jobs - self.logger.info(f"Using {cpus} CPU's in parallel (n_jobs={self.n_jobs})") + cpus = multiprocessing.cpu_count() if n_jobs == -1 else n_jobs + self.logger.info(f"Using {cpus} CPU's in parallel (n_jobs={n_jobs})") star_input = [ (X, y, step_number, estimator) @@ -100,6 +111,16 @@ def fit(self, X, y) -> AbstractEstimator: return self + def postfit(self): + """Post-fit hook. Is executed right after calling `fit()`. Can be used to save + estimators to cache, for example.""" + + for estimator in self.estimators: + if hasattr(estimator, "postfit") and callable( + getattr(estimator, "postfit") + ): + estimator.postfit() + def transform(self, X, y): ... diff --git a/fseval/pipelines/rank_and_validate/_components.py b/fseval/pipelines/rank_and_validate/_components.py index 9c3713b..4aaa8eb 100644 --- a/fseval/pipelines/rank_and_validate/_components.py +++ b/fseval/pipelines/rank_and_validate/_components.py @@ -6,6 +6,7 @@ import pandas as pd from fseval.callbacks import WandbCallback from fseval.pipeline.estimator import Estimator +from fseval.types import TerminalColor from omegaconf import MISSING from sklearn.base import clone from tqdm import tqdm @@ -116,7 +117,9 @@ def score(self, X, y): scores = scores.append(validation_score) scores["bootstrap_state"] = self.bootstrap_state - self.logger.info(f"scored bootstrap_state={self.bootstrap_state} ✓") + self.logger.info( + f"scored bootstrap_state={self.bootstrap_state} " + TerminalColor.green("✓") + ) return scores @@ -127,7 +130,11 @@ class BootstrappedRankAndValidate(Experiment, RankAndValidatePipeline): that various metrics can be better approximated.""" logger: Logger = getLogger(__name__) - n_jobs: Optional[int] = -1 # utilize all CPU's + + def _get_n_jobs(self): + """Allow each bootstrap experiment to run on a separate CPU.""" + + return self.n_jobs def _get_estimator(self): for bootstrap_state in np.arange(1, self.n_bootstraps + 1): diff --git a/fseval/pipelines/rank_and_validate/_config.py b/fseval/pipelines/rank_and_validate/_config.py index dea5f19..5bda2bf 100644 --- a/fseval/pipelines/rank_and_validate/_config.py +++ b/fseval/pipelines/rank_and_validate/_config.py @@ -1,5 +1,6 @@ import logging from dataclasses import dataclass +from typing import Optional from fseval.pipeline.estimator import Estimator, TaskedEstimatorConfig from fseval.pipeline.resample import Resample, ResampleConfig @@ -20,6 +21,7 @@ class RankAndValidateConfig: ranker: TaskedEstimatorConfig = MISSING validator: TaskedEstimatorConfig = MISSING n_bootstraps: int = MISSING + n_jobs: Optional[int] = MISSING all_features_to_select: str = MISSING @@ -33,6 +35,7 @@ class RankAndValidatePipeline(Pipeline): ranker: Estimator = MISSING validator: Estimator = MISSING n_bootstraps: int = MISSING + n_jobs: Optional[int] = MISSING all_features_to_select: str = MISSING def _get_config(self): diff --git a/fseval/pipelines/rank_and_validate/_ranking_validator.py b/fseval/pipelines/rank_and_validate/_ranking_validator.py index 3150eeb..b97859f 100644 --- a/fseval/pipelines/rank_and_validate/_ranking_validator.py +++ b/fseval/pipelines/rank_and_validate/_ranking_validator.py @@ -3,11 +3,10 @@ import numpy as np import pandas as pd +from fseval.types import IncompatibilityError from omegaconf import MISSING from sklearn.metrics import log_loss, r2_score -from fseval.types import IncompatibilityError - from .._experiment import Experiment from ._config import RankAndValidatePipeline @@ -31,20 +30,24 @@ def __post_init__(self): super(RankingValidator, self).__post_init__() + @property + def _cache_filename(self): + override = f"bootstrap_state={self.bootstrap_state}" + filename = f"ranking[{override}].pickle" + + return filename + def _get_estimator(self): yield self.ranker + def prefit(self): + self.ranker._load_cache(self._cache_filename, self.storage_provider) + def fit(self, X, y): - override = f"bootstrap_state={self.bootstrap_state}" - filename = f"ranking[{override}].pickle" - restored = self.storage_provider.restore_pickle(filename) - - if restored: - self.ranker.estimator = restored - self.logger.info("restored ranking from storage provider ✓") - else: - super(RankingValidator, self).fit(X, y) - self.storage_provider.save_pickle(filename, self.ranker.estimator) + super(RankingValidator, self).fit(X, y) + + def postfit(self): + self.ranker._save_cache(self._cache_filename, self.storage_provider) def score(self, X, y): """Scores a feature ranker, if a ground-truth on the desired dataset diff --git a/fseval/pipelines/rank_and_validate/_subset_validator.py b/fseval/pipelines/rank_and_validate/_subset_validator.py index 5f2cfe2..7426a14 100644 --- a/fseval/pipelines/rank_and_validate/_subset_validator.py +++ b/fseval/pipelines/rank_and_validate/_subset_validator.py @@ -1,11 +1,10 @@ from dataclasses import dataclass import numpy as np -from omegaconf import MISSING -from sklearn.feature_selection import SelectFromModel - from fseval.pipeline.estimator import Estimator from fseval.types import IncompatibilityError +from omegaconf import MISSING +from sklearn.feature_selection import SelectFromModel from .._experiment import Experiment from ._config import RankAndValidatePipeline @@ -57,18 +56,22 @@ def _prepare_data(self, X, y): X = selector.transform(X) return X, y - def fit(self, X, y): + @property + def _cache_filename(self): override = f"bootstrap_state={self.bootstrap_state}" override += f",n_features_to_select={self.n_features_to_select}" filename = f"validation[{override}].pickle" - restored = self.storage_provider.restore_pickle(filename) - if restored: - self.validator.estimator = restored - self.logger.info("restored validator from storage provider ✓") - else: - super(SubsetValidator, self).fit(X, y) - self.storage_provider.save_pickle(filename, self.validator.estimator) + return filename + + def prefit(self): + self.validator._load_cache(self._cache_filename, self.storage_provider) + + def fit(self, X, y): + super(SubsetValidator, self).fit(X, y) + + def postfit(self): + self.validator._save_cache(self._cache_filename, self.storage_provider) def score(self, X, y): score = super(SubsetValidator, self).score(X, y) diff --git a/fseval/storage_providers/wandb.py b/fseval/storage_providers/wandb.py index 5091379..9e1e474 100644 --- a/fseval/storage_providers/wandb.py +++ b/fseval/storage_providers/wandb.py @@ -3,7 +3,7 @@ from pickle import dump, load from typing import Any, Callable, Dict -from fseval.types import AbstractStorageProvider +from fseval.types import AbstractStorageProvider, TerminalColor import wandb @@ -11,6 +11,13 @@ class WandbStorageProvider(AbstractStorageProvider): logger: Logger = getLogger(__name__) + def _assert_wandb_available(self): + assert wandb.run is not None, ( + "`wandb.run` is not available in this process. you are perhaps using multi-" + + "processing: make sure to only use the wandb storage provider from the main" + + "thread. see https://docs.wandb.ai/guides/track/advanced/distributed-training." + ) + def set_config(self, config: Dict): assert config["callbacks"].get( "wandb" @@ -18,8 +25,7 @@ def set_config(self, config: Dict): super(WandbStorageProvider, self).set_config(config) def save(self, filename: str, writer: Callable, mode: str = "w"): - if __name__ != "__main__": - return + self._assert_wandb_available() filedir = wandb.run.dir # type: ignore filepath = os.path.join(filedir, filename) @@ -28,7 +34,10 @@ def save(self, filename: str, writer: Callable, mode: str = "w"): writer(file_handle) wandb.save(filename, base_path="/") # type: ignore - self.logger.info(f"successfully saved `{filename}` to wandb servers ✓") + self.logger.info( + f"successfully saved {TerminalColor.yellow(filename)} to wandb servers " + + TerminalColor.green("✓") + ) def save_pickle(self, filename: str, obj: Any): self.save(filename, lambda file: dump(obj, file), mode="wb") @@ -53,8 +62,7 @@ def _get_restore_file_handle(self, filename: str): return None def restore(self, filename: str, reader: Callable, mode: str = "r") -> Any: - if __name__ != "__main__": - return + self._assert_wandb_available() file_handle = self._get_restore_file_handle(filename) @@ -66,7 +74,10 @@ def restore(self, filename: str, reader: Callable, mode: str = "r") -> Any: with open(filepath, mode=mode) as file_handle: file = reader(file_handle) - self.logger.info(f"successfully restored `{filename}` from wandb servers ✓") + self.logger.info( + f"successfully restored {TerminalColor.yellow(filename)} from wandb servers " + + TerminalColor.green("✓") + ) return file def restore_pickle(self, filename: str) -> Any: