diff --git a/.github/workflows/testing_ci.yml b/.github/workflows/testing_ci.yml index 141dcd18..190d8f3f 100644 --- a/.github/workflows/testing_ci.yml +++ b/.github/workflows/testing_ci.yml @@ -49,7 +49,6 @@ jobs: run: | which python which pip - pip install --upgrade pip pip install torch==${{ steps.determine_pytorch_ver.outputs.value }} -f https://download.pytorch.org/whl/cpu python -c "import torch; print('PyTorch:', torch.__version__)" @@ -58,6 +57,10 @@ jobs: pip install -r requirements.txt pip install torch-geometric torch-scatter torch-sparse -f "https://data.pyg.org/whl/torch-${{ steps.determine_pytorch_ver.outputs.value }}+cpu.html" pip install pypots[dev] + python_site_path=`python -c "import site; print(site.getsitepackages()[0])"` + echo "python site-packages path: $python_site_path" + rm -rf $python_site_path/pypots + python -c "import shutil;import site;shutil.copytree('pypots',site.getsitepackages()[0]+'/pypots')" - name: Fetch the test environment details run: | @@ -66,7 +69,8 @@ jobs: - name: Test with pytest run: | - rm -rf tests/__pycache__ + python tests/global_test_config.py + rm -rf tests/__pycache__ && rm -rf tests/*/__pycache__ python -m pytest -rA tests/*/* -n auto --cov=pypots --dist=loadgroup --cov-config=.coveragerc - name: Generate the LCOV report diff --git a/environment-dev.yml b/environment-dev.yml index be9a43c5..11d3ba09 100644 --- a/environment-dev.yml +++ b/environment-dev.yml @@ -16,8 +16,9 @@ dependencies: #- conda-forge::pandas <2.0.0 #- conda-forge::h5py #- conda-forge::tensorboard - #- conda-forge::pygrinder >=0.2 + #- conda-forge::pygrinder >=0.4 #- conda-forge::tsdb >=0.2 + #- conda-forge::matplotlib #- pytorch::pytorch >=1.10.0 ## Below we install the latest pypots because we need pypots-cli in it for development. ## PyPOTS itself already includes all basic dependencies. diff --git a/pypots/classification/brits/data.py b/pypots/classification/brits/data.py index 5903d5ca..2c5c2581 100644 --- a/pypots/classification/brits/data.py +++ b/pypots/classification/brits/data.py @@ -45,4 +45,4 @@ def __init__( return_labels: bool = True, file_type: str = "h5py", ): - super().__init__(data, return_labels, file_type) + super().__init__(data, False, return_labels, file_type) diff --git a/pypots/classification/brits/model.py b/pypots/classification/brits/model.py index af4be7ef..a64cd346 100644 --- a/pypots/classification/brits/model.py +++ b/pypots/classification/brits/model.py @@ -149,7 +149,7 @@ def __init__( self.optimizer = optimizer self.optimizer.init_optimizer(self.model.parameters()) - def _assemble_input_for_training(self, data: dict) -> dict: + def _assemble_input_for_training(self, data: list) -> dict: # fetch data ( indices, @@ -179,10 +179,10 @@ def _assemble_input_for_training(self, data: dict) -> dict: } return inputs - def _assemble_input_for_validating(self, data: dict) -> dict: + def _assemble_input_for_validating(self, data: list) -> dict: return self._assemble_input_for_training(data) - def _assemble_input_for_testing(self, data: dict) -> dict: + def _assemble_input_for_testing(self, data: list) -> dict: # fetch data ( indices, diff --git a/pypots/classification/grud/data.py b/pypots/classification/grud/data.py index fff60f46..a4e4a163 100644 --- a/pypots/classification/grud/data.py +++ b/pypots/classification/grud/data.py @@ -48,7 +48,7 @@ def __init__( return_labels: bool = True, file_type: str = "h5py", ): - super().__init__(data, return_labels, file_type) + super().__init__(data, False, return_labels, file_type) self.locf = LOCF() if not isinstance(self.data, str): # data from array diff --git a/pypots/classification/grud/model.py b/pypots/classification/grud/model.py index ce5ef5ab..fc429ae1 100644 --- a/pypots/classification/grud/model.py +++ b/pypots/classification/grud/model.py @@ -133,7 +133,7 @@ def __init__( self.optimizer = optimizer self.optimizer.init_optimizer(self.model.parameters()) - def _assemble_input_for_training(self, data: dict) -> dict: + def _assemble_input_for_training(self, data: list) -> dict: # fetch data ( indices, @@ -157,10 +157,10 @@ def _assemble_input_for_training(self, data: dict) -> dict: } return inputs - def _assemble_input_for_validating(self, data: dict) -> dict: + def _assemble_input_for_validating(self, data: list) -> dict: return self._assemble_input_for_training(data) - def _assemble_input_for_testing(self, data: dict) -> dict: + def _assemble_input_for_testing(self, data: list) -> dict: ( indices, X, diff --git a/pypots/classification/raindrop/model.py b/pypots/classification/raindrop/model.py index 31520f9b..b813c105 100644 --- a/pypots/classification/raindrop/model.py +++ b/pypots/classification/raindrop/model.py @@ -173,7 +173,7 @@ def __init__( self.optimizer = optimizer self.optimizer.init_optimizer(self.model.parameters()) - def _assemble_input_for_training(self, data: dict) -> dict: + def _assemble_input_for_training(self, data: list) -> dict: # fetch data ( indices, @@ -199,10 +199,10 @@ def _assemble_input_for_training(self, data: dict) -> dict: } return inputs - def _assemble_input_for_validating(self, data: dict) -> dict: + def _assemble_input_for_validating(self, data: list) -> dict: return self._assemble_input_for_training(data) - def _assemble_input_for_testing(self, data: dict) -> dict: + def _assemble_input_for_testing(self, data: list) -> dict: ( indices, X, diff --git a/pypots/clustering/crli/data.py b/pypots/clustering/crli/data.py index 4f7e1112..6025752a 100644 --- a/pypots/clustering/crli/data.py +++ b/pypots/clustering/crli/data.py @@ -44,7 +44,7 @@ def __init__( return_labels: bool = True, file_type: str = "h5py", ): - super().__init__(data, return_labels, file_type) + super().__init__(data, False, return_labels, file_type) def _fetch_data_from_array(self, idx: int) -> Iterable: return super()._fetch_data_from_array(idx) diff --git a/pypots/clustering/vader/data.py b/pypots/clustering/vader/data.py index bdccd3cd..6a098774 100644 --- a/pypots/clustering/vader/data.py +++ b/pypots/clustering/vader/data.py @@ -44,7 +44,7 @@ def __init__( return_labels: bool = True, file_type: str = "h5py", ): - super().__init__(data, return_labels, file_type) + super().__init__(data, False, return_labels, file_type) def _fetch_data_from_array(self, idx: int) -> Iterable: return super()._fetch_data_from_array(idx) diff --git a/pypots/data/base.py b/pypots/data/base.py index 1ed4a9b9..4a65db16 100644 --- a/pypots/data/base.py +++ b/pypots/data/base.py @@ -11,8 +11,11 @@ import h5py import numpy as np import torch +from pygrinder import fill_and_get_mask_torch from torch.utils.data import Dataset +from .utils import turn_data_into_specified_dtype + # Currently we only support h5 files SUPPORTED_DATASET_FILE_TYPE = ["h5py"] @@ -48,7 +51,8 @@ class BaseDataset(Dataset): def __init__( self, data: Union[dict, str], - return_labels: bool = True, + return_X_ori: bool, + return_labels: bool, file_type: str = "h5py", ): super().__init__() @@ -56,13 +60,14 @@ def __init__( # So they are safe to use here. No need to check again. self.data = data + self.return_X_ori = return_X_ori self.return_labels = return_labels + if isinstance(self.data, str): # data from file # check if the given file type is supported assert ( file_type in SUPPORTED_DATASET_FILE_TYPE ), f"file_type should be one of {SUPPORTED_DATASET_FILE_TYPE}, but got {file_type}" - self.file_type = file_type # open the file handle @@ -74,8 +79,23 @@ def __init__( else: # data from array X = data["X"] + X_ori = None if "X_ori" not in data.keys() else data["X_ori"] y = None if "y" not in data.keys() else data["y"] - self.X, self.y = self._check_input(X, y) + self.X, self.X_ori, self.y = self._check_array_input(X, X_ori, y) + + if self.X_ori is not None and self.return_X_ori: + # Only when X_ori is given and fixed, we fill the missing values in X here in advance. + # Otherwise, we may need original X with missing values to generate X_ori, e.g. in DatasetForSAITS. + self.X, self.missing_mask = fill_and_get_mask_torch(self.X) + + self.X_ori, X_ori_missing_mask = fill_and_get_mask_torch(self.X_ori) + indicating_mask = X_ori_missing_mask - self.missing_mask + self.indicating_mask = indicating_mask.to(torch.float32) + else: + self.missing_mask = None + self.indicating_mask = None + # if return_X_ori is false, set X_ori to None as well + self.X_ori = None self.n_samples, self.n_steps, self.n_features = self._get_data_sizes() @@ -112,12 +132,14 @@ def __len__(self) -> int: return self.n_samples @staticmethod - def _check_input( + def _check_array_input( X: Union[np.ndarray, torch.Tensor, list], + X_ori: Union[np.ndarray, torch.Tensor, list], y: Optional[Union[np.ndarray, torch.Tensor, list]] = None, out_dtype: str = "tensor", ) -> Tuple[ - Union[np.ndarray, torch.Tensor, list], + Union[np.ndarray, torch.Tensor], + Union[np.ndarray, torch.Tensor], Optional[Union[np.ndarray, torch.Tensor, list]], ]: """Check value type and shape of input X and y @@ -127,6 +149,10 @@ def _check_input( X : Time-series data that must have a shape like [n_samples, expected_n_steps, expected_n_features]. + X_ori : + If X is with artificial missingness, X_ori is the original X without artificial missing values. + It must have the same shape as X. If X_ori is with original missing values, should be left as NaN. + y : Labels of time-series samples (X) that must have a shape like [n_samples] or [n_samples, n_classes]. @@ -137,6 +163,8 @@ def _check_input( ------- X : + X_ori : + y : """ @@ -145,55 +173,29 @@ def _check_input( "ndarray", ], f'out_dtype should be "tensor" or "ndarray", but got {out_dtype}' - is_list = isinstance(X, list) - is_array = isinstance(X, np.ndarray) - is_tensor = isinstance(X, torch.Tensor) - assert is_tensor or is_array or is_list, TypeError( - "X should be an instance of list/np.ndarray/torch.Tensor, " - f"but got {type(X)}" - ) - - # convert the data type if in need - if out_dtype == "tensor": - if is_list: - X = torch.tensor(X) - elif is_array: - X = torch.from_numpy(X) - else: # is tensor - pass - else: # out_dtype is ndarray - # convert to np.ndarray first for shape check - if is_list: - X = np.asarray(X) - elif is_tensor: - X = X.numpy() - else: # is ndarray - pass + # change the data type of X + X = turn_data_into_specified_dtype(X, out_dtype) + X = X.to(torch.float32) # check the shape of X here X_shape = X.shape assert len(X_shape) == 3, ( f"input should have 3 dimensions [n_samples, seq_len, n_features]," - f"but got shape={X_shape}" + f"but got X: {X_shape}" ) - + if X_ori is not None: + X_ori = turn_data_into_specified_dtype(X_ori, out_dtype) + X_ori = X_ori.to(torch.float32) + assert ( + X_shape == X_ori.shape + ), f"X and X_ori must have matched shape, but got X: f{X.shape} and X_ori: {X_ori.shape}" if y is not None: assert len(X) == len(y), ( f"lengths of X and y must match, " f"but got f{len(X)} and {len(y)}" ) - if isinstance(y, torch.Tensor): - y = y if out_dtype == "tensor" else y.numpy() - elif isinstance(y, list): - y = torch.tensor(y) if out_dtype == "tensor" else np.asarray(y) - elif isinstance(y, np.ndarray): - y = torch.from_numpy(y) if out_dtype == "tensor" else y - else: - raise TypeError( - "y should be an instance of list/np.ndarray/torch.Tensor, " - f"but got {type(y)}" - ) + y = turn_data_into_specified_dtype(y, out_dtype) - return X, y + return X, X_ori, y @abstractmethod def _fetch_data_from_array(self, idx: int) -> Iterable: @@ -210,15 +212,24 @@ def _fetch_data_from_array(self, idx: int) -> Iterable: The collated data sample, a list including all necessary sample info. """ - X = self.X[idx].to(torch.float32) - missing_mask = (~torch.isnan(X)).to(torch.float32) - X = torch.nan_to_num(X) + if self.X_ori is None: + X = self.X[idx] + X, missing_mask = fill_and_get_mask_torch(X) + else: + X = self.X[idx] + missing_mask = self.missing_mask[idx] + sample = [ torch.tensor(idx), X, missing_mask, ] + if self.X_ori is not None and self.return_X_ori: + X_ori = self.X_ori[idx] + indicating_mask = self.indicating_mask[idx] + sample.extend([X_ori, indicating_mask]) + if self.y is not None and self.return_labels: sample.append(self.y[idx].to(torch.long)) @@ -286,14 +297,19 @@ def _fetch_data_from_file(self, idx: int) -> Iterable: self.file_handle = self._open_file_handle() X = torch.from_numpy(self.file_handle["X"][idx]).to(torch.float32) - missing_mask = (~torch.isnan(X)).to(torch.float32) - X = torch.nan_to_num(X) + X, missing_mask = fill_and_get_mask_torch(X) sample = [ torch.tensor(idx), X, missing_mask, ] + if "X_ori" in self.file_handle.keys() and self.return_X_ori: + X_ori = torch.from_numpy(self.file_handle["X_ori"][idx]).to(torch.float32) + X_ori, X_ori_missing_mask = fill_and_get_mask_torch(X_ori) + indicating_mask = (X_ori_missing_mask - missing_mask).to(torch.float32) + sample.extend([X_ori, indicating_mask]) + # if the dataset has labels and is for training, then fetch it from the file if "y" in self.file_handle.keys() and self.return_labels: sample.append(self.file_handle["y"][idx].to(torch.long)) diff --git a/pypots/data/checking.py b/pypots/data/checking.py new file mode 100644 index 00000000..af22958f --- /dev/null +++ b/pypots/data/checking.py @@ -0,0 +1,21 @@ +""" + +""" + +# Created by Wenjie Du +# License: BSD-3-Clause + + +from typing import Union + +import h5py + + +def check_X_ori_in_val_set(val_set: Union[str, dict]) -> bool: + if isinstance(val_set, str): + with h5py.File(val_set, "r") as f: + return "X_ori" in f.keys() + elif isinstance(val_set, dict): + return "X_ori" in val_set.keys() + else: + raise TypeError("val_set must be a str or a Python dictionary.") diff --git a/pypots/data/generating.py b/pypots/data/generating.py index 4a3dd952..4094489c 100644 --- a/pypots/data/generating.py +++ b/pypots/data/generating.py @@ -9,8 +9,7 @@ from typing import Optional, Tuple import numpy as np -import torch -from pygrinder import mcar, masked_fill +from pygrinder import mcar from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.utils import check_random_state @@ -273,10 +272,8 @@ def gene_random_walk( if missing_rate > 0: # create random missing values - _, train_X, missing_mask, _ = mcar(train_X, missing_rate) - train_X = masked_fill(train_X, 1 - missing_mask, torch.nan) - _, val_X, missing_mask, _ = mcar(val_X, missing_rate) - val_X = masked_fill(val_X, 1 - missing_mask, torch.nan) + train_X = mcar(train_X, missing_rate) + val_X = mcar(val_X, missing_rate) # test set is left to mask after normalization train_X = train_X.reshape(-1, n_features) @@ -306,23 +303,22 @@ def gene_random_walk( if missing_rate > 0: # mask values in the validation set as ground truth - val_X_intact, val_X, val_X_missing_mask, val_X_indicating_mask = mcar( - val_X, missing_rate - ) - val_X = masked_fill(val_X, 1 - val_X_missing_mask, torch.nan) + val_X_ori = val_X + val_X = mcar(val_X, missing_rate) # mask values in the test set as ground truth - test_X_intact, test_X, test_X_missing_mask, test_X_indicating_mask = mcar( - test_X, 0.3 - ) - test_X = masked_fill(test_X, 1 - test_X_missing_mask, torch.nan) + test_X_ori = test_X + test_X = mcar(test_X, 0.3) data["val_X"] = val_X - data["val_X_intact"] = val_X_intact - data["val_X_indicating_mask"] = val_X_indicating_mask + data["val_X_ori"] = val_X_ori + + # test_X is for model input data["test_X"] = test_X - data["test_X_intact"] = test_X_intact - data["test_X_indicating_mask"] = test_X_indicating_mask + # test_X_ori is for error calc, not for model input, hence mustn't have NaNs + data["test_X_ori"] = np.nan_to_num(test_X_ori) + data["test_X_indicating_mask"] = ~np.isnan(test_X_ori) ^ ~np.isnan(test_X) + return data @@ -414,22 +410,19 @@ def gene_physionet2012(artificially_missing_rate: float = 0.1): if artificially_missing_rate > 0: # mask values in the validation set as ground truth - val_X_intact, val_X, val_X_missing_mask, val_X_indicating_mask = mcar( - val_X, artificially_missing_rate - ) - val_X = masked_fill(val_X, 1 - val_X_missing_mask, torch.nan) - + val_X_ori = val_X + val_X = mcar(val_X, artificially_missing_rate) # mask values in the test set as ground truth - test_X_intact, test_X, test_X_missing_mask, test_X_indicating_mask = mcar( - test_X, artificially_missing_rate - ) - test_X = masked_fill(test_X, 1 - test_X_missing_mask, torch.nan) + test_X_ori = test_X + test_X = mcar(test_X, artificially_missing_rate) data["val_X"] = val_X + data["val_X_ori"] = val_X_ori + + # test_X is for model input data["test_X"] = test_X - data["test_X_intact"] = test_X_intact - data["test_X_indicating_mask"] = test_X_indicating_mask - data["val_X_intact"] = val_X_intact - data["val_X_indicating_mask"] = val_X_indicating_mask + # test_X_ori is for error calc, not for model input, hence mustn't have NaNs + data["test_X_ori"] = np.nan_to_num(test_X_ori) + data["test_X_indicating_mask"] = ~np.isnan(test_X_ori) ^ ~np.isnan(test_X) return data diff --git a/pypots/data/utils.py b/pypots/data/utils.py index e752b8a3..4cb62635 100644 --- a/pypots/data/utils.py +++ b/pypots/data/utils.py @@ -6,10 +6,30 @@ # License: BSD-3-Clause from typing import Union + import numpy as np import torch +def turn_data_into_specified_dtype( + data: Union[np.ndarray, torch.Tensor, list], + dtype: str = "tensor", +) -> Union[np.ndarray, torch.Tensor]: + """Turn the given data into the specified data type.""" + + if isinstance(data, torch.Tensor): + data = data if dtype == "tensor" else data.numpy() + elif isinstance(data, list): + data = torch.tensor(data) if dtype == "tensor" else np.asarray(data) + elif isinstance(data, np.ndarray): + data = torch.from_numpy(data) if dtype == "tensor" else data + else: + raise TypeError( + f"data should be an instance of list/np.ndarray/torch.Tensor, but got {type(data)}" + ) + return data + + def _parse_delta_torch(missing_mask: torch.Tensor) -> torch.Tensor: """Generate the time-gap matrix (i.e. the delta metrix) from the missing mask. Please refer to :cite:`che2018GRUD` for its math definition. diff --git a/pypots/imputation/base.py b/pypots/imputation/base.py index 488d5d6f..a338a49b 100644 --- a/pypots/imputation/base.py +++ b/pypots/imputation/base.py @@ -288,23 +288,24 @@ def _train_model( if val_loader is not None: self.model.eval() - imputation_collector = [] + imputation_loss_collector = [] with torch.no_grad(): for idx, data in enumerate(val_loader): inputs = self._assemble_input_for_validating(data) results = self.model.forward(inputs, training=False) - imputed_data = results["imputed_data"] - imputation_collector.append(imputed_data) - - imputation_collector = torch.cat(imputation_collector) - imputation_collector = imputation_collector.cpu().detach().numpy() - - mean_val_loss = calc_mse( - imputation_collector, - val_loader.dataset.data["X_intact"], - val_loader.dataset.data["indicating_mask"], - # the above val_loader.dataset.data is a dict containing the validation dataset - ) + imputation_mse = ( + calc_mse( + results["imputed_data"], + inputs["X_ori"], + inputs["indicating_mask"], + ) + .sum() + .detach() + .item() + ) + imputation_loss_collector.append(imputation_mse) + + mean_val_loss = np.mean(imputation_loss_collector) # save validating loss logs into the tensorboard file for every epoch if in need if self.summary_writer is not None: diff --git a/pypots/imputation/brits/data.py b/pypots/imputation/brits/data.py index 24946f71..6d33f516 100644 --- a/pypots/imputation/brits/data.py +++ b/pypots/imputation/brits/data.py @@ -8,6 +8,7 @@ from typing import Union, Iterable import torch +from pygrinder import fill_and_get_mask_torch from ...data.base import BaseDataset from ...data.utils import _parse_delta_torch @@ -43,15 +44,20 @@ class DatasetForBRITS(BaseDataset): def __init__( self, data: Union[dict, str], - return_labels: bool = True, + return_X_ori: bool, + return_labels: bool, file_type: str = "h5py", ): - super().__init__(data, return_labels, file_type) + super().__init__(data, return_X_ori, return_labels, file_type) if not isinstance(self.data, str): # calculate all delta here. - forward_missing_mask = (~torch.isnan(self.X)).type(torch.float32) - forward_X = torch.nan_to_num(self.X) + if self.X_ori is None: + forward_X, forward_missing_mask = fill_and_get_mask_torch(self.X) + else: + forward_missing_mask = self.missing_mask + forward_X = self.X + forward_delta = _parse_delta_torch(forward_missing_mask) backward_X = torch.flip(forward_X, dims=[1]) backward_missing_mask = torch.flip(forward_missing_mask, dims=[1]) @@ -110,6 +116,9 @@ def _fetch_data_from_array(self, idx: int) -> Iterable: self.processed_data["backward"]["delta"][idx], ] + if self.X_ori is not None and self.return_X_ori: + sample.extend([self.X_ori[idx], self.indicating_mask[idx]]) + if self.y is not None and self.return_labels: sample.append(self.y[idx].to(torch.long)) @@ -134,8 +143,7 @@ def _fetch_data_from_file(self, idx: int) -> Iterable: self.file_handle = self._open_file_handle() X = torch.from_numpy(self.file_handle["X"][idx]).to(torch.float32) - missing_mask = (~torch.isnan(X)).to(torch.float32) - X = torch.nan_to_num(X) + X, missing_mask = fill_and_get_mask_torch(X) forward = { "X": X, @@ -161,6 +169,11 @@ def _fetch_data_from_file(self, idx: int) -> Iterable: backward["deltas"], ] + if "X_ori" in self.file_handle.keys() and self.return_X_ori: + X_ori = torch.from_numpy(self.file_handle["X_ori"][idx]).to(torch.float32) + X_ori, indicating_mask = fill_and_get_mask_torch(X_ori) + sample.extend([X_ori, indicating_mask]) + # if the dataset has labels and is for training, then fetch it from the file if "y" in self.file_handle.keys() and self.return_labels: sample.append(torch.tensor(self.file_handle["y"][idx], dtype=torch.long)) diff --git a/pypots/imputation/brits/model.py b/pypots/imputation/brits/model.py index bbc79cc5..2fd255d7 100644 --- a/pypots/imputation/brits/model.py +++ b/pypots/imputation/brits/model.py @@ -16,7 +16,6 @@ from typing import Union, Optional -import h5py import numpy as np import torch from torch.utils.data import DataLoader @@ -24,6 +23,7 @@ from .data import DatasetForBRITS from .modules import _BRITS from ..base import BaseNNImputer +from ...data.checking import check_X_ori_in_val_set from ...optim.adam import Adam from ...optim.base import Optimizer from ...utils.logging import logger @@ -156,10 +156,39 @@ def _assemble_input_for_training(self, data: list) -> dict: return inputs def _assemble_input_for_validating(self, data: list) -> dict: - return self._assemble_input_for_training(data) + # fetch data + ( + indices, + X, + missing_mask, + deltas, + back_X, + back_missing_mask, + back_deltas, + X_ori, + indicating_mask, + ) = self._send_data_to_given_device(data) + + # assemble input data + inputs = { + "indices": indices, + "forward": { + "X": X, + "missing_mask": missing_mask, + "deltas": deltas, + }, + "backward": { + "X": back_X, + "missing_mask": back_missing_mask, + "deltas": back_deltas, + }, + "X_ori": X_ori, + "indicating_mask": indicating_mask, + } + return inputs def _assemble_input_for_testing(self, data: list) -> dict: - return self._assemble_input_for_validating(data) + return self._assemble_input_for_training(data) def fit( self, @@ -169,7 +198,7 @@ def fit( ) -> None: # Step 1: wrap the input data with classes Dataset and DataLoader training_set = DatasetForBRITS( - train_set, return_labels=False, file_type=file_type + train_set, return_X_ori=False, return_labels=False, file_type=file_type ) training_loader = DataLoader( training_set, @@ -179,30 +208,11 @@ def fit( ) val_loader = None if val_set is not None: - if isinstance(val_set, str): - with h5py.File(val_set, "r") as hf: - # Here we read the whole validation set from the file to mask a portion for validation. - # In PyPOTS, using a file usually because the data is too big. However, the validation set is - # generally shouldn't be too large. For example, we have 1 billion samples for model training. - # We won't take 20% of them as the validation set because we want as much as possible data for the - # training stage to enhance the model's generalization ability. Therefore, 100,000 representative - # samples will be enough to validate the model. - val_set = { - "X": hf["X"][:], - "X_intact": hf["X_intact"][:], - "indicating_mask": hf["indicating_mask"][:], - } - - # check if X_intact contains missing values - if np.isnan(val_set["X_intact"]).any(): - val_set["X_intact"] = np.nan_to_num(val_set["X_intact"], nan=0) - logger.warning( - "X_intact shouldn't contain missing data but has NaN values. " - "PyPOTS has imputed them with zeros by default to start the training for now. " - "Please double-check your data if you have concerns over this operation." - ) - - val_set = DatasetForBRITS(val_set, return_labels=False, file_type=file_type) + if not check_X_ori_in_val_set(val_set): + raise ValueError("val_set must contain 'X_ori' for model validation.") + val_set = DatasetForBRITS( + val_set, return_X_ori=True, return_labels=False, file_type=file_type + ) val_loader = DataLoader( val_set, batch_size=self.batch_size, @@ -224,7 +234,9 @@ def predict( file_type: str = "h5py", ) -> dict: self.model.eval() # set the model as eval status to freeze it. - test_set = DatasetForBRITS(test_set, return_labels=False, file_type=file_type) + test_set = DatasetForBRITS( + test_set, return_X_ori=False, return_labels=False, file_type=file_type + ) test_loader = DataLoader( test_set, batch_size=self.batch_size, diff --git a/pypots/imputation/brits/modules/core.py b/pypots/imputation/brits/modules/core.py index 58413c7c..d4b5ebfa 100644 --- a/pypots/imputation/brits/modules/core.py +++ b/pypots/imputation/brits/modules/core.py @@ -298,17 +298,6 @@ def reverse_tensor(tensor_): return ret def forward(self, inputs: dict, training: bool = True) -> dict: - """Forward processing of BRITS. - - Parameters - ---------- - inputs : - The input data. - - Returns - ------- - dict, A dictionary includes all results. - """ # Results from the forward RITS. ret_f = self.rits_f(inputs, "forward") # Results from the backward RITS. @@ -325,14 +314,14 @@ def forward(self, inputs: dict, training: bool = True) -> dict: consistency_loss = self._get_consistency_loss( ret_f["imputed_data"], ret_b["imputed_data"] ) - - # `loss` is always the item for backward propagating to update the model + results["consistency_loss"] = consistency_loss loss = ( consistency_loss + ret_f["reconstruction_loss"] + ret_b["reconstruction_loss"] ) - results["consistency_loss"] = consistency_loss + + # `loss` is always the item for backward propagating to update the model results["loss"] = loss return results diff --git a/pypots/imputation/csdi/data.py b/pypots/imputation/csdi/data.py index 9f9f1cab..c4aadb4e 100644 --- a/pypots/imputation/csdi/data.py +++ b/pypots/imputation/csdi/data.py @@ -7,8 +7,9 @@ from typing import Union, Iterable +import numpy as np import torch -from pygrinder import mcar +from pygrinder import fill_and_get_mask_torch from ...data.base import BaseDataset @@ -19,12 +20,34 @@ class DatasetForCSDI(BaseDataset): def __init__( self, data: Union[dict, str], - return_labels: bool = True, + target_strategy: str, + return_X_ori: bool, + return_labels: bool, file_type: str = "h5py", - rate: float = 0.1, ): - super().__init__(data, return_labels, file_type) - self.rate = rate + super().__init__(data, return_X_ori, return_labels, file_type) + self.target_strategy = target_strategy + + @staticmethod + def get_rand_mask(observed_mask): + rand_for_mask = torch.rand_like(observed_mask) * observed_mask + rand_for_mask = rand_for_mask.reshape(-1) + sample_ratio = np.random.rand() # missing ratio + num_observed = observed_mask.sum().item() + num_masked = round(num_observed * sample_ratio) + rand_for_mask[rand_for_mask.topk(num_masked).indices] = -1 + cond_mask = (rand_for_mask > 0).reshape(observed_mask.shape).float() + return cond_mask + + def get_hist_mask(self, observed_mask, for_pattern_mask): + cond_mask = observed_mask.clone() + mask_choice = np.random.rand() + if self.target_strategy == "mix" and mask_choice > 0.5: + rand_mask = self.get_rand_mask(observed_mask) + cond_mask = rand_mask + else: + cond_mask = cond_mask * for_pattern_mask + return cond_mask def _fetch_data_from_array(self, idx: int) -> Iterable: """Fetch data according to index. @@ -42,7 +65,7 @@ def _fetch_data_from_array(self, idx: int) -> Iterable: index : int tensor, The index of the sample. - X_intact : tensor, + X_ori : tensor, Original time-series for calculating mask imputation loss. X : tensor, @@ -54,36 +77,42 @@ def _fetch_data_from_array(self, idx: int) -> Iterable: indicating_mask : tensor. The mask indicates artificially missing values in X. """ - X = self.X[idx].to(torch.float32) - X_intact, X, missing_mask, indicating_mask = mcar(X, p=self.rate) - observed_data = X_intact - observed_mask = missing_mask + indicating_mask - gt_mask = missing_mask + if self.X_ori is not None and self.return_X_ori: + observed_data = self.X_ori[idx] + cond_mask = self.missing_mask[idx] + indicating_mask = self.indicating_mask[idx] + else: + observed_data = self.X[idx] + observed_data, observed_mask = fill_and_get_mask_torch(observed_data) + if self.target_strategy == "random": + cond_mask = self.get_rand_mask(observed_mask) + else: + if "for_pattern_mask" in self.data.keys(): + for_pattern_mask = torch.from_numpy( + self.data["for_pattern_mask"][idx] + ).to(torch.float32) + else: + previous_sample = self.X[idx - 1] + for_pattern_mask = (~torch.isnan(previous_sample)).to(torch.float32) + + cond_mask = self.get_hist_mask( + observed_mask, for_pattern_mask=for_pattern_mask + ) + indicating_mask = observed_mask - cond_mask + observed_tp = ( torch.arange(0, self.n_steps, dtype=torch.float32) if "time_points" not in self.data.keys() else torch.from_numpy(self.data["time_points"][idx]).to(torch.float32) ) - for_pattern_mask = ( - gt_mask - if "for_pattern_mask" not in self.data.keys() - else torch.from_numpy(self.data["for_pattern_mask"][idx]).to(torch.float32) - ) - cut_length = ( - torch.zeros(len(observed_data)).long() - if "cut_length" not in self.data.keys() - else torch.from_numpy(self.data["cut_length"][idx]).to(torch.float32) - ) sample = [ torch.tensor(idx), observed_data, - observed_mask, + indicating_mask, + cond_mask, observed_tp, - gt_mask, - for_pattern_mask, - cut_length, ] if self.y is not None and self.return_labels: @@ -109,12 +138,37 @@ def _fetch_data_from_file(self, idx: int) -> Iterable: if self.file_handle is None: self.file_handle = self._open_file_handle() - X = torch.from_numpy(self.file_handle["X"][idx]).to(torch.float32) - X_intact, X, missing_mask, indicating_mask = mcar(X, p=self.rate) + if "X_ori" in self.file_handle.keys() and self.return_X_ori: + observed_data = torch.from_numpy(self.file_handle["X_ori"][idx]).to( + torch.float32 + ) + observed_data, observed_mask = fill_and_get_mask_torch(observed_data) + X = torch.from_numpy(self.file_handle["X"][idx]).to(torch.float32) + _, cond_mask = fill_and_get_mask_torch(X) + indicating_mask = observed_mask - cond_mask + else: + observed_data = torch.from_numpy(self.file_handle["X"][idx]).to( + torch.float32 + ) + observed_data, observed_mask = fill_and_get_mask_torch(observed_data) + if self.target_strategy == "random": + cond_mask = self.get_rand_mask(observed_mask) + else: + if "for_pattern_mask" in self.data.keys(): + for_pattern_mask = torch.from_numpy( + self.file_handle["for_pattern_mask"][idx] + ).to(torch.float32) + else: + previous_sample = torch.from_numpy( + self.file_handle["X"][idx - 1] + ).to(torch.float32) + for_pattern_mask = (~torch.isnan(previous_sample)).to(torch.float32) + + cond_mask = self.get_hist_mask( + observed_mask, for_pattern_mask=for_pattern_mask + ) + indicating_mask = observed_mask - cond_mask - observed_data = X_intact - observed_mask = missing_mask + indicating_mask - gt_mask = indicating_mask observed_tp = ( torch.arange(0, self.n_steps, dtype=torch.float32) if "time_points" not in self.file_handle.keys() @@ -122,30 +176,121 @@ def _fetch_data_from_file(self, idx: int) -> Iterable: torch.float32 ) ) - for_pattern_mask = ( - gt_mask - if "for_pattern_mask" not in self.file_handle.keys() - else torch.from_numpy(self.file_handle["for_pattern_mask"][idx]).to( + + sample = [ + torch.tensor(idx), + observed_data, + indicating_mask, + cond_mask, + observed_tp, + ] + + if "y" in self.file_handle.keys() and self.return_labels: + sample.append(torch.tensor(self.file_handle["y"][idx], dtype=torch.long)) + + return sample + + +class TestDatasetForCSDI(DatasetForCSDI): + """Test dataset for CSDI model.""" + + def __init__( + self, + data: Union[dict, str], + return_X_ori: bool, + return_labels: bool, + file_type: str = "h5py", + ): + super().__init__(data, "random", return_X_ori, return_labels, file_type) + + def _fetch_data_from_array(self, idx: int) -> Iterable: + """Fetch data according to index. + + Parameters + ---------- + idx : int, + The index to fetch the specified sample. + + Returns + ------- + sample : list, + A list contains + + index : int tensor, + The index of the sample. + + X_ori : tensor, + Original time-series for calculating mask imputation loss. + + X : tensor, + Time-series data with artificially missing values for model input. + + missing_mask : tensor, + The mask records all missing values in X. + + indicating_mask : tensor. + The mask indicates artificially missing values in X. + """ + + observed_data = self.X[idx] + observed_data, observed_mask = fill_and_get_mask_torch(observed_data) + cond_mask = observed_mask + + observed_tp = ( + torch.arange(0, self.n_steps, dtype=torch.float32) + if "time_points" not in self.data.keys() + else torch.from_numpy(self.data["time_points"][idx]).to(torch.float32) + ) + + sample = [ + torch.tensor(idx), + observed_data, + cond_mask, + observed_tp, + ] + + if self.y is not None and self.return_labels: + sample.append(self.y[idx].to(torch.long)) + + return sample + + def _fetch_data_from_file(self, idx: int) -> Iterable: + """Fetch data with the lazy-loading strategy, i.e. only loading data from the file while requesting for samples. + Here the opened file handle doesn't load the entire dataset into RAM but only load the currently accessed slice. + + Parameters + ---------- + idx : int, + The index of the sample to be return. + + Returns + ------- + sample : list, + The collated data sample, a list including all necessary sample info. + """ + + if self.file_handle is None: + self.file_handle = self._open_file_handle() + + observed_data = torch.from_numpy(self.file_handle["X"][idx]).to(torch.float32) + observed_data, observed_mask = fill_and_get_mask_torch(observed_data) + cond_mask = observed_mask + + observed_tp = ( + torch.arange(0, self.n_steps, dtype=torch.float32) + if "time_points" not in self.file_handle.keys() + else torch.from_numpy(self.file_handle["time_points"][idx]).to( torch.float32 ) ) - cut_length = ( - torch.zeros(len(observed_data)).long() - if "cut_length" not in self.file_handle.keys() - else torch.from_numpy(self.file_handle["cut_length"][idx]).to(torch.float32) - ) sample = [ torch.tensor(idx), observed_data, - observed_mask, + cond_mask, observed_tp, - gt_mask, - for_pattern_mask, - cut_length, ] - # if the dataset has labels and is for training, then fetch it from the file if "y" in self.file_handle.keys() and self.return_labels: sample.append(torch.tensor(self.file_handle["y"][idx], dtype=torch.long)) diff --git a/pypots/imputation/csdi/model.py b/pypots/imputation/csdi/model.py index 17de692e..d4707a14 100644 --- a/pypots/imputation/csdi/model.py +++ b/pypots/imputation/csdi/model.py @@ -16,7 +16,6 @@ import os from typing import Union, Optional -import h5py import numpy as np import torch from torch.utils.data import DataLoader @@ -26,9 +25,10 @@ except ImportError: pass -from .data import DatasetForCSDI +from .data import DatasetForCSDI, TestDatasetForCSDI from .modules import _CSDI from ..base import BaseNNImputer +from ...data.checking import check_X_ori_in_val_set from ...optim.adam import Adam from ...optim.base import Optimizer from ...utils.logging import logger @@ -160,6 +160,7 @@ def __init__( ) assert target_strategy in ["mix", "random"] assert schedule in ["quad", "linear"] + self.target_strategy = target_strategy # set up the model self.model = _CSDI( @@ -171,7 +172,6 @@ def __init__( d_feature_embedding, d_diffusion_embedding, is_unconditional, - target_strategy, n_diffusion_steps, schedule, beta_start, @@ -187,21 +187,17 @@ def __init__( def _assemble_input_for_training(self, data: list) -> dict: ( indices, - observed_data, - observed_mask, + X_ori, + indicating_mask, + cond_mask, observed_tp, - gt_mask, - for_pattern_mask, - cut_length, ) = self._send_data_to_given_device(data) inputs = { - "observed_data": observed_data.permute(0, 2, 1), - "observed_mask": observed_mask.permute(0, 2, 1), + "X_ori": X_ori.permute(0, 2, 1), + "indicating_mask": indicating_mask.permute(0, 2, 1), + "cond_mask": cond_mask.permute(0, 2, 1), "observed_tp": observed_tp, - "gt_mask": gt_mask.permute(0, 2, 1), - "for_pattern_mask": for_pattern_mask, - "cut_length": cut_length, } return inputs @@ -209,7 +205,19 @@ def _assemble_input_for_validating(self, data) -> dict: return self._assemble_input_for_training(data) def _assemble_input_for_testing(self, data) -> dict: - return self._assemble_input_for_validating(data) + ( + indices, + X, + con_mask, + observed_tp, + ) = self._send_data_to_given_device(data) + + inputs = { + "X": X.permute(0, 2, 1), + "con_mask": con_mask.permute(0, 2, 1), + "observed_tp": observed_tp, + } + return inputs def _train_model( self, @@ -327,7 +335,11 @@ def fit( ) -> None: # Step 1: wrap the input data with classes Dataset and DataLoader training_set = DatasetForCSDI( - train_set, return_labels=False, file_type=file_type + train_set, + self.target_strategy, + return_X_ori=False, + return_labels=False, + file_type=file_type, ) training_loader = DataLoader( training_set, @@ -337,30 +349,15 @@ def fit( ) val_loader = None if val_set is not None: - if isinstance(val_set, str): - with h5py.File(val_set, "r") as hf: - # Here we read the whole validation set from the file to mask a portion for validation. - # In PyPOTS, using a file usually because the data is too big. However, the validation set is - # generally shouldn't be too large. For example, we have 1 billion samples for model training. - # We won't take 20% of them as the validation set because we want as much as possible data for the - # training stage to enhance the model's generalization ability. Therefore, 100,000 representative - # samples will be enough to validate the model. - val_set = { - "X": hf["X"][:], - "X_intact": hf["X_intact"][:], - "indicating_mask": hf["indicating_mask"][:], - } - - # check if X_intact contains missing values - if np.isnan(val_set["X_intact"]).any(): - val_set["X_intact"] = np.nan_to_num(val_set["X_intact"], nan=0) - logger.warning( - "X_intact shouldn't contain missing data but has NaN values. " - "PyPOTS has imputed them with zeros by default to start the training for now. " - "Please double-check your data if you have concerns over this operation." - ) - - val_set = DatasetForCSDI(val_set, return_labels=False, file_type=file_type) + if not check_X_ori_in_val_set(val_set): + raise ValueError("val_set must contain 'X_ori' for model validation.") + val_set = DatasetForCSDI( + val_set, + self.target_strategy, + return_X_ori=False, + return_labels=False, + file_type=file_type, + ) val_loader = DataLoader( val_set, batch_size=self.batch_size, @@ -410,7 +407,9 @@ def predict( """ # Step 1: wrap the input data with classes Dataset and DataLoader self.model.eval() # set the model as eval status to freeze it. - test_set = DatasetForCSDI(test_set, return_labels=False, file_type=file_type) + test_set = TestDatasetForCSDI( + test_set, return_X_ori=False, return_labels=False, file_type=file_type + ) test_loader = DataLoader( test_set, batch_size=self.batch_size, diff --git a/pypots/imputation/csdi/modules/core.py b/pypots/imputation/csdi/modules/core.py index 7c811539..a2c968e1 100644 --- a/pypots/imputation/csdi/modules/core.py +++ b/pypots/imputation/csdi/modules/core.py @@ -19,7 +19,6 @@ def __init__( d_feature_embedding, d_diffusion_embedding, is_unconditional, - target_strategy, n_diffusion_steps, schedule, beta_start, @@ -31,7 +30,6 @@ def __init__( self.d_time_embedding = d_time_embedding self.d_feature_embedding = d_feature_embedding self.is_unconditional = is_unconditional - self.target_strategy = target_strategy self.n_channels = n_channels self.n_diffusion_steps = n_diffusion_steps @@ -87,33 +85,6 @@ def time_embedding(pos, d_model=128): pe[:, :, 1::2] = torch.cos(position * div_term) return pe - @staticmethod - def get_rand_mask(observed_mask): - rand_for_mask = torch.rand_like(observed_mask) * observed_mask - rand_for_mask = rand_for_mask.reshape(len(rand_for_mask), -1) - for i in range(len(observed_mask)): - sample_ratio = np.random.rand() # missing ratio - num_observed = observed_mask[i].sum().item() - num_masked = round(num_observed * sample_ratio) - rand_for_mask[i][rand_for_mask[i].topk(num_masked).indices] = -1 - cond_mask = (rand_for_mask > 0).reshape(observed_mask.shape).float() - return cond_mask - - def get_hist_mask(self, observed_mask, for_pattern_mask=None): - if for_pattern_mask is None: - for_pattern_mask = observed_mask - if self.target_strategy == "mix": - rand_mask = self.get_rand_mask(observed_mask) - - cond_mask = observed_mask.clone() - for i in range(len(cond_mask)): - mask_choice = np.random.rand() - if self.target_strategy == "mix" and mask_choice > 0.5: - cond_mask[i] = rand_mask[i] - else: # draw another sample for hist mask (i-1 corresponds to another sample) - cond_mask[i] = cond_mask[i] * for_pattern_mask[i - 1] - return cond_mask - def get_side_info(self, observed_tp, cond_mask): B, K, L = cond_mask.shape device = observed_tp.device @@ -139,18 +110,18 @@ def get_side_info(self, observed_tp, cond_mask): return side_info def calc_loss_valid( - self, observed_data, cond_mask, observed_mask, side_info, is_train + self, observed_data, cond_mask, indicating_mask, side_info, is_train ): loss_sum = 0 for t in range(self.n_diffusion_steps): # calculate loss for all t loss = self.calc_loss( - observed_data, cond_mask, observed_mask, side_info, is_train, set_t=t + observed_data, cond_mask, indicating_mask, side_info, is_train, set_t=t ) loss_sum += loss.detach() return loss_sum / self.n_diffusion_steps def calc_loss( - self, observed_data, cond_mask, observed_mask, side_info, is_train, set_t=-1 + self, observed_data, cond_mask, indicating_mask, side_info, is_train, set_t=-1 ): B, K, L = observed_data.shape device = observed_data.device @@ -169,7 +140,7 @@ def calc_loss( predicted = self.diff_model(total_input, side_info, t) # (B,K,L) - target_mask = observed_mask - cond_mask + target_mask = indicating_mask residual = (noise - predicted) * target_mask num_eval = target_mask.sum() loss = (residual**2).sum() / (num_eval if num_eval > 0 else 1) @@ -234,37 +205,43 @@ def impute(self, observed_data, cond_mask, side_info, n_sampling_times): return imputed_samples def forward(self, inputs, training=True, n_sampling_times=1): - (observed_data, observed_mask, observed_tp, gt_mask, for_pattern_mask,) = ( - inputs["observed_data"], - inputs["observed_mask"], - inputs["observed_tp"], - inputs["gt_mask"], - inputs["for_pattern_mask"], - ) - - if not training: - cond_mask = gt_mask - elif self.target_strategy != "random": - cond_mask = self.get_hist_mask( - observed_mask, for_pattern_mask=for_pattern_mask + results = {} + if training: # for training + (observed_data, indicating_mask, cond_mask, observed_tp) = ( + inputs["X_ori"], + inputs["indicating_mask"], + inputs["cond_mask"], + inputs["observed_tp"], ) - else: - cond_mask = self.get_rand_mask(observed_mask) - - side_info = self.get_side_info(observed_tp, cond_mask) - - loss_func = self.calc_loss if training else self.calc_loss_valid - - # `loss` is always the item for backward propagating to update the model - loss = loss_func(observed_data, cond_mask, observed_mask, side_info, training) - results = {"loss": loss} - - if not training and n_sampling_times > 0: + side_info = self.get_side_info(observed_tp, cond_mask) + training_loss = self.calc_loss( + observed_data, cond_mask, indicating_mask, side_info, training + ) + results["loss"] = training_loss + elif not training and n_sampling_times == 0: # for validating + (observed_data, indicating_mask, cond_mask, observed_tp) = ( + inputs["X_ori"], + inputs["indicating_mask"], + inputs["cond_mask"], + inputs["observed_tp"], + ) + side_info = self.get_side_info(observed_tp, cond_mask) + validating_loss = self.calc_loss_valid( + observed_data, cond_mask, indicating_mask, side_info, training + ) + results["loss"] = validating_loss + elif not training and n_sampling_times > 0: # for testing + observed_data, cond_mask, observed_tp = ( + inputs["X"], + inputs["con_mask"], + inputs["observed_tp"], + ) + side_info = self.get_side_info(observed_tp, cond_mask) samples = self.impute( observed_data, cond_mask, side_info, n_sampling_times ) # (bz,n_sampling,K,L) repeated_obs = observed_data.unsqueeze(1).repeat(1, n_sampling_times, 1, 1) - repeated_mask = gt_mask.unsqueeze(1).repeat(1, n_sampling_times, 1, 1) + repeated_mask = cond_mask.unsqueeze(1).repeat(1, n_sampling_times, 1, 1) imputed_data = repeated_obs + samples * (1 - repeated_mask) results["imputed_data"] = imputed_data.permute( diff --git a/pypots/imputation/gpvae/data.py b/pypots/imputation/gpvae/data.py index 8193fb5e..24b5739b 100644 --- a/pypots/imputation/gpvae/data.py +++ b/pypots/imputation/gpvae/data.py @@ -8,7 +8,7 @@ from typing import Union, Iterable import torch - +from pygrinder import fill_and_get_mask_torch from ...data.base import BaseDataset @@ -42,20 +42,11 @@ class DatasetForGPVAE(BaseDataset): def __init__( self, data: Union[dict, str], - return_labels: bool = True, + return_X_ori: bool, + return_labels: bool, file_type: str = "h5py", ): - super().__init__(data, return_labels, file_type) - - if not isinstance(self.data, str): - # calculate all delta here. - missing_mask = (~torch.isnan(self.X)).type(torch.float32) - X = torch.nan_to_num(self.X).to(torch.float32) - - self.processed_data = { - "X": X, - "missing_mask": missing_mask, - } + super().__init__(data, return_X_ori, return_labels, file_type) def _fetch_data_from_array(self, idx: int) -> Iterable: """Fetch data from self.X if it is given. @@ -85,12 +76,17 @@ def _fetch_data_from_array(self, idx: int) -> Iterable: label (optional) : tensor, The target label of the time-series sample. """ - sample = [ - torch.tensor(idx), - # for forward - self.processed_data["X"][idx], - self.processed_data["missing_mask"][idx], - ] + X = self.X[idx] + + if self.X_ori is not None and self.return_X_ori: + X = self.X[idx] + missing_mask = self.missing_mask[idx] + X_ori = self.X_ori[idx] + indicating_mask = self.indicating_mask[idx] + sample = [torch.tensor(idx), X, missing_mask, X_ori, indicating_mask] + else: + X, missing_mask = fill_and_get_mask_torch(X) + sample = [torch.tensor(idx), X, missing_mask] if self.y is not None and self.return_labels: sample.append(self.y[idx].to(torch.long)) @@ -115,15 +111,17 @@ def _fetch_data_from_file(self, idx: int) -> Iterable: if self.file_handle is None: self.file_handle = self._open_file_handle() - X = torch.from_numpy(self.file_handle["X"][idx]).to(torch.float32) - missing_mask = (~torch.isnan(X)).to(torch.float32) - X = torch.nan_to_num(X) - - sample = [ - torch.tensor(idx), - X, - missing_mask, - ] + if "X_ori" in self.file_handle.keys() and self.return_X_ori: + X = torch.from_numpy(self.file_handle["X"][idx]).to(torch.float32) + X_ori = torch.from_numpy(self.file_handle["X_ori"][idx]).to(torch.float32) + X_ori, X_ori_missing_mask = fill_and_get_mask_torch(X_ori) + X, missing_mask = fill_and_get_mask_torch(X) + indicating_mask = (X_ori_missing_mask - missing_mask).to(torch.float32) + sample = [torch.tensor(idx), X, missing_mask, X_ori, indicating_mask] + else: + X = torch.from_numpy(self.file_handle["X"][idx]).to(torch.float32) + X, missing_mask = fill_and_get_mask_torch(X) + sample = [torch.tensor(idx), X, missing_mask] # if the dataset has labels and is for training, then fetch it from the file if "y" in self.file_handle.keys() and self.return_labels: diff --git a/pypots/imputation/gpvae/model.py b/pypots/imputation/gpvae/model.py index 63a1c8f6..90699151 100644 --- a/pypots/imputation/gpvae/model.py +++ b/pypots/imputation/gpvae/model.py @@ -12,7 +12,6 @@ from typing import Union, Optional -import h5py import numpy as np import torch from torch.utils.data import DataLoader @@ -20,6 +19,7 @@ from .data import DatasetForGPVAE from .modules import _GPVAE from ..base import BaseNNImputer +from ...data.checking import check_X_ori_in_val_set from ...optim.adam import Adam from ...optim.base import Optimizer from ...utils.logging import logger @@ -172,10 +172,28 @@ def _assemble_input_for_training(self, data: list) -> dict: return inputs def _assemble_input_for_validating(self, data: list) -> dict: - return self._assemble_input_for_training(data) + # fetch data + ( + indices, + X, + missing_mask, + X_ori, + indicating_mask, + ) = self._send_data_to_given_device(data) + + # assemble input data + inputs = { + "indices": indices, + "X": X, + "missing_mask": missing_mask, + "X_ori": X_ori, + "indicating_mask": indicating_mask, + } + + return inputs def _assemble_input_for_testing(self, data: list) -> dict: - return self._assemble_input_for_validating(data) + return self._assemble_input_for_training(data) def fit( self, @@ -185,7 +203,7 @@ def fit( ) -> None: # Step 1: wrap the input data with classes Dataset and DataLoader training_set = DatasetForGPVAE( - train_set, return_labels=False, file_type=file_type + train_set, return_X_ori=False, return_labels=False, file_type=file_type ) training_loader = DataLoader( training_set, @@ -195,30 +213,11 @@ def fit( ) val_loader = None if val_set is not None: - if isinstance(val_set, str): - with h5py.File(val_set, "r") as hf: - # Here we read the whole validation set from the file to mask a portion for validation. - # In PyPOTS, using a file usually because the data is too big. However, the validation set is - # generally shouldn't be too large. For example, we have 1 billion samples for model training. - # We won't take 20% of them as the validation set because we want as much as possible data for the - # training stage to enhance the model's generalization ability. Therefore, 100,000 representative - # samples will be enough to validate the model. - val_set = { - "X": hf["X"][:], - "X_intact": hf["X_intact"][:], - "indicating_mask": hf["indicating_mask"][:], - } - - # check if X_intact contains missing values - if np.isnan(val_set["X_intact"]).any(): - val_set["X_intact"] = np.nan_to_num(val_set["X_intact"], nan=0) - logger.warning( - "X_intact shouldn't contain missing data but has NaN values. " - "PyPOTS has imputed them with zeros by default to start the training for now. " - "Please double-check your data if you have concerns over this operation." - ) - - val_set = DatasetForGPVAE(val_set, return_labels=False, file_type=file_type) + if not check_X_ori_in_val_set(val_set): + raise ValueError("val_set must contain 'X_ori' for model validation.") + val_set = DatasetForGPVAE( + val_set, return_X_ori=True, return_labels=False, file_type=file_type + ) val_loader = DataLoader( val_set, batch_size=self.batch_size, @@ -240,7 +239,9 @@ def predict( file_type="h5py", ) -> dict: self.model.eval() # set the model as eval status to freeze it. - test_set = DatasetForGPVAE(test_set, return_labels=False, file_type=file_type) + test_set = DatasetForGPVAE( + test_set, return_X_ori=False, return_labels=False, file_type=file_type + ) test_loader = DataLoader( test_set, batch_size=self.batch_size, diff --git a/pypots/imputation/locf/model.py b/pypots/imputation/locf/model.py index b13083e8..82b72dc5 100644 --- a/pypots/imputation/locf/model.py +++ b/pypots/imputation/locf/model.py @@ -9,6 +9,7 @@ import warnings from typing import Union, Optional +import h5py import numpy as np import torch @@ -199,8 +200,11 @@ def predict( It should be a dictionary including keys as 'imputation', 'classification', 'clustering', and 'forecasting'. For sure, only the keys that relevant tasks are supported by the model will be returned. """ - assert not isinstance(test_set, str) - X = test_set["X"] + if isinstance(test_set, str): + with h5py.File(test_set, "r") as f: + X = f["X"][:] + else: + X = test_set["X"] assert len(X.shape) == 3, ( f"Input X should have 3 dimensions [n_samples, n_steps, n_features], " diff --git a/pypots/imputation/mrnn/data.py b/pypots/imputation/mrnn/data.py index e2cee8d8..22d9e5d9 100644 --- a/pypots/imputation/mrnn/data.py +++ b/pypots/imputation/mrnn/data.py @@ -40,7 +40,8 @@ class DatasetForMRNN(DatasetForBRITS): def __init__( self, data: Union[dict, str], - return_labels: bool = True, + return_X_ori: bool, + return_labels: bool, file_type: str = "h5py", ): - super().__init__(data, return_labels, file_type) + super().__init__(data, return_X_ori, return_labels, file_type) diff --git a/pypots/imputation/mrnn/model.py b/pypots/imputation/mrnn/model.py index 6a82ce1c..cbb095c7 100644 --- a/pypots/imputation/mrnn/model.py +++ b/pypots/imputation/mrnn/model.py @@ -10,7 +10,6 @@ from typing import Union, Optional -import h5py import numpy as np import torch from torch.utils.data import DataLoader @@ -18,6 +17,7 @@ from .data import DatasetForMRNN from .modules import _MRNN from ..base import BaseNNImputer +from ...data.checking import check_X_ori_in_val_set from ...optim.adam import Adam from ...optim.base import Optimizer from ...utils.logging import logger @@ -151,10 +151,40 @@ def _assemble_input_for_training(self, data: list) -> dict: return inputs def _assemble_input_for_validating(self, data: list) -> dict: - return self._assemble_input_for_training(data) + # fetch data + ( + indices, + X, + missing_mask, + deltas, + back_X, + back_missing_mask, + back_deltas, + X_ori, + indicating_mask, + ) = self._send_data_to_given_device(data) + + # assemble input data + inputs = { + "indices": indices, + "forward": { + "X": X, + "missing_mask": missing_mask, + "deltas": deltas, + }, + "backward": { + "X": back_X, + "missing_mask": back_missing_mask, + "deltas": back_deltas, + }, + "X_ori": X_ori, + "indicating_mask": indicating_mask, + } + + return inputs def _assemble_input_for_testing(self, data: list) -> dict: - return self._assemble_input_for_validating(data) + return self._assemble_input_for_training(data) def fit( self, @@ -164,7 +194,7 @@ def fit( ) -> None: # Step 1: wrap the input data with classes Dataset and DataLoader training_set = DatasetForMRNN( - train_set, return_labels=False, file_type=file_type + train_set, return_X_ori=False, return_labels=False, file_type=file_type ) training_loader = DataLoader( training_set, @@ -174,30 +204,11 @@ def fit( ) val_loader = None if val_set is not None: - if isinstance(val_set, str): - with h5py.File(val_set, "r") as hf: - # Here we read the whole validation set from the file to mask a portion for validation. - # In PyPOTS, using a file usually because the data is too big. However, the validation set is - # generally shouldn't be too large. For example, we have 1 billion samples for model training. - # We won't take 20% of them as the validation set because we want as much as possible data for the - # training stage to enhance the model's generalization ability. Therefore, 100,000 representative - # samples will be enough to validate the model. - val_set = { - "X": hf["X"][:], - "X_intact": hf["X_intact"][:], - "indicating_mask": hf["indicating_mask"][:], - } - - # check if X_intact contains missing values - if np.isnan(val_set["X_intact"]).any(): - val_set["X_intact"] = np.nan_to_num(val_set["X_intact"], nan=0) - logger.warning( - "X_intact shouldn't contain missing data but has NaN values. " - "PyPOTS has imputed them with zeros by default to start the training for now. " - "Please double-check your data if you have concerns over this operation." - ) - - val_set = DatasetForMRNN(val_set, return_labels=False, file_type=file_type) + if not check_X_ori_in_val_set(val_set): + raise ValueError("val_set must contain 'X_ori' for model validation.") + val_set = DatasetForMRNN( + val_set, return_X_ori=True, return_labels=False, file_type=file_type + ) val_loader = DataLoader( val_set, batch_size=self.batch_size, @@ -219,7 +230,9 @@ def predict( file_type="h5py", ) -> dict: self.model.eval() # set the model as eval status to freeze it. - test_set = DatasetForMRNN(test_set, return_labels=False, file_type=file_type) + test_set = DatasetForMRNN( + test_set, return_X_ori=False, return_labels=False, file_type=file_type + ) test_loader = DataLoader( test_set, batch_size=self.batch_size, diff --git a/pypots/imputation/saits/data.py b/pypots/imputation/saits/data.py index 3eb5e375..aeae871a 100644 --- a/pypots/imputation/saits/data.py +++ b/pypots/imputation/saits/data.py @@ -8,7 +8,7 @@ from typing import Union, Iterable import torch -from pygrinder import mcar +from pygrinder import mcar, fill_and_get_mask_torch from ...data.base import BaseDataset @@ -53,11 +53,12 @@ class DatasetForSAITS(BaseDataset): def __init__( self, data: Union[dict, str], - return_labels: bool = True, + return_X_ori: bool, + return_labels: bool, file_type: str = "h5py", rate: float = 0.2, ): - super().__init__(data, return_labels, file_type) + super().__init__(data, return_X_ori, return_labels, file_type) self.rate = rate def _fetch_data_from_array(self, idx: int) -> Iterable: @@ -76,7 +77,7 @@ def _fetch_data_from_array(self, idx: int) -> Iterable: index : int tensor, The index of the sample. - X_intact : tensor, + X_ori : tensor, Original time-series for calculating mask imputation loss. X : tensor, @@ -88,14 +89,24 @@ def _fetch_data_from_array(self, idx: int) -> Iterable: indicating_mask : tensor. The mask indicates artificially missing values in X. """ - X = self.X[idx].to(torch.float32) - X_intact, X, missing_mask, indicating_mask = mcar(X, p=self.rate) + + if self.X_ori is not None and self.return_X_ori: + X = self.X[idx] + X_ori = self.X_ori[idx] + missing_mask = self.missing_mask[idx] + indicating_mask = self.indicating_mask[idx] + else: + X_ori = self.X[idx] + X = mcar(X_ori, p=self.rate) + X, missing_mask = fill_and_get_mask_torch(X) + X_ori, X_ori_missing_mask = fill_and_get_mask_torch(X_ori) + indicating_mask = (X_ori_missing_mask - missing_mask).to(torch.float32) sample = [ torch.tensor(idx), - X_intact, X, missing_mask, + X_ori, indicating_mask, ] @@ -122,16 +133,20 @@ def _fetch_data_from_file(self, idx: int) -> Iterable: if self.file_handle is None: self.file_handle = self._open_file_handle() - X = torch.from_numpy(self.file_handle["X"][idx]).to(torch.float32) - X_intact, X, missing_mask, indicating_mask = mcar(X, p=self.rate) - - sample = [ - torch.tensor(idx), - X_intact, - X, - missing_mask, - indicating_mask, - ] + if "X_ori" in self.file_handle.keys() and self.return_X_ori: + X = torch.from_numpy(self.file_handle["X"][idx]).to(torch.float32) + X_ori = torch.from_numpy(self.file_handle["X_ori"][idx]).to(torch.float32) + X_ori, X_ori_missing_mask = fill_and_get_mask_torch(X_ori) + X, missing_mask = fill_and_get_mask_torch(X) + indicating_mask = (X_ori_missing_mask - missing_mask).to(torch.float32) + else: + X_ori = torch.from_numpy(self.file_handle["X"][idx]).to(torch.float32) + X = mcar(X_ori, p=self.rate) + X_ori, X_ori_missing_mask = fill_and_get_mask_torch(X_ori) + X, missing_mask = fill_and_get_mask_torch(X) + indicating_mask = (X_ori_missing_mask - missing_mask).to(torch.float32) + + sample = [torch.tensor(idx), X, missing_mask, X_ori, indicating_mask] # if the dataset has labels and is for training, then fetch it from the file if "y" in self.file_handle.keys() and self.return_labels: diff --git a/pypots/imputation/saits/model.py b/pypots/imputation/saits/model.py index 99952645..278bd898 100644 --- a/pypots/imputation/saits/model.py +++ b/pypots/imputation/saits/model.py @@ -15,7 +15,6 @@ from typing import Union, Optional, Callable -import h5py import numpy as np import torch from torch.utils.data import DataLoader @@ -24,6 +23,7 @@ from .modules import _SAITS from ..base import BaseNNImputer from ...data.base import BaseDataset +from ...data.checking import check_X_ori_in_val_set from ...optim.adam import Adam from ...optim.base import Optimizer from ...utils.logging import logger @@ -206,22 +206,25 @@ def __init__( def _assemble_input_for_training(self, data: list) -> dict: ( indices, - X_intact, X, missing_mask, + X_ori, indicating_mask, ) = self._send_data_to_given_device(data) inputs = { "X": X, - "X_intact": X_intact, "missing_mask": missing_mask, + "X_ori": X_ori, "indicating_mask": indicating_mask, } return inputs - def _assemble_input_for_validating(self, data) -> dict: + def _assemble_input_for_validating(self, data: list) -> dict: + return self._assemble_input_for_training(data) + + def _assemble_input_for_testing(self, data: list) -> dict: indices, X, missing_mask = self._send_data_to_given_device(data) inputs = { @@ -230,9 +233,6 @@ def _assemble_input_for_validating(self, data) -> dict: } return inputs - def _assemble_input_for_testing(self, data) -> dict: - return self._assemble_input_for_validating(data) - def fit( self, train_set: Union[dict, str], @@ -241,7 +241,7 @@ def fit( ) -> None: # Step 1: wrap the input data with classes Dataset and DataLoader training_set = DatasetForSAITS( - train_set, return_labels=False, file_type=file_type + train_set, return_X_ori=False, return_labels=False, file_type=file_type ) training_loader = DataLoader( training_set, @@ -251,30 +251,11 @@ def fit( ) val_loader = None if val_set is not None: - if isinstance(val_set, str): - with h5py.File(val_set, "r") as hf: - # Here we read the whole validation set from the file to mask a portion for validation. - # In PyPOTS, using a file usually because the data is too big. However, the validation set is - # generally shouldn't be too large. For example, we have 1 billion samples for model training. - # We won't take 20% of them as the validation set because we want as much as possible data for the - # training stage to enhance the model's generalization ability. Therefore, 100,000 representative - # samples will be enough to validate the model. - val_set = { - "X": hf["X"][:], - "X_intact": hf["X_intact"][:], - "indicating_mask": hf["indicating_mask"][:], - } - - # check if X_intact contains missing values - if np.isnan(val_set["X_intact"]).any(): - val_set["X_intact"] = np.nan_to_num(val_set["X_intact"], nan=0) - logger.warning( - "X_intact shouldn't contain missing data but has NaN values. " - "PyPOTS has imputed them with zeros by default to start the training for now. " - "Please double-check your data if you have concerns over this operation." - ) - - val_set = BaseDataset(val_set, return_labels=False, file_type=file_type) + if not check_X_ori_in_val_set(val_set): + raise ValueError("val_set must contain 'X_ori' for model validation.") + val_set = DatasetForSAITS( + val_set, return_X_ori=True, return_labels=False, file_type=file_type + ) val_loader = DataLoader( val_set, batch_size=self.batch_size, @@ -328,7 +309,9 @@ def predict( """ # Step 1: wrap the input data with classes Dataset and DataLoader self.model.eval() # set the model as eval status to freeze it. - test_set = BaseDataset(test_set, return_labels=False, file_type=file_type) + test_set = BaseDataset( + test_set, return_X_ori=False, return_labels=False, file_type=file_type + ) test_loader = DataLoader( test_set, batch_size=self.batch_size, diff --git a/pypots/imputation/saits/modules/core.py b/pypots/imputation/saits/modules/core.py index 05dd0287..7d2a97a6 100644 --- a/pypots/imputation/saits/modules/core.py +++ b/pypots/imputation/saits/modules/core.py @@ -155,7 +155,10 @@ def _process( ) def forward( - self, inputs: dict, diagonal_attention_mask: bool = False, training: bool = True + self, + inputs: dict, + diagonal_attention_mask: bool = False, + training: bool = True, ) -> dict: X, masks = inputs["X"], inputs["missing_mask"] @@ -190,7 +193,7 @@ def forward( ORT_loss /= 3 MIT_loss = self.customized_loss_func( - X_tilde_3, inputs["X_intact"], inputs["indicating_mask"] + X_tilde_3, inputs["X_ori"], inputs["indicating_mask"] ) results["ORT_loss"] = ORT_loss diff --git a/pypots/imputation/timesnet/data.py b/pypots/imputation/timesnet/data.py index 5d95170d..d30f8a53 100644 --- a/pypots/imputation/timesnet/data.py +++ b/pypots/imputation/timesnet/data.py @@ -16,8 +16,9 @@ class DatasetForTimesNet(DatasetForSAITS): def __init__( self, data: Union[dict, str], - return_labels: bool = True, + return_X_ori: bool, + return_labels: bool, file_type: str = "h5py", rate: float = 0.2, ): - super().__init__(data, return_labels, file_type, rate) + super().__init__(data, return_X_ori, return_labels, file_type, rate) diff --git a/pypots/imputation/timesnet/model.py b/pypots/imputation/timesnet/model.py index 408f6cfd..60132b39 100644 --- a/pypots/imputation/timesnet/model.py +++ b/pypots/imputation/timesnet/model.py @@ -15,7 +15,6 @@ from typing import Union, Optional -import h5py import numpy as np import torch from torch.utils.data import DataLoader @@ -24,6 +23,7 @@ from .modules.core import _TimesNet from ..base import BaseNNImputer from ...data.base import BaseDataset +from ...data.checking import check_X_ori_in_val_set from ...optim.adam import Adam from ...optim.base import Optimizer from ...utils.logging import logger @@ -172,25 +172,28 @@ def __init__( self.optimizer = optimizer self.optimizer.init_optimizer(self.model.parameters()) - def _assemble_input_for_training(self, data: dict) -> dict: + def _assemble_input_for_training(self, data: list) -> dict: ( indices, - X_intact, X, missing_mask, + X_ori, indicating_mask, ) = self._send_data_to_given_device(data) inputs = { "X": X, - "X_intact": X_intact, "missing_mask": missing_mask, + "X_ori": X_ori, "indicating_mask": indicating_mask, } return inputs def _assemble_input_for_validating(self, data: list) -> dict: + return self._assemble_input_for_training(data) + + def _assemble_input_for_testing(self, data: list) -> dict: indices, X, missing_mask = self._send_data_to_given_device(data) inputs = { @@ -200,9 +203,6 @@ def _assemble_input_for_validating(self, data: list) -> dict: return inputs - def _assemble_input_for_testing(self, data: list) -> dict: - return self._assemble_input_for_validating(data) - def fit( self, train_set: Union[dict, str], @@ -211,7 +211,7 @@ def fit( ) -> None: # Step 1: wrap the input data with classes Dataset and DataLoader training_set = DatasetForTimesNet( - train_set, return_labels=False, file_type=file_type + train_set, return_X_ori=False, return_labels=False, file_type=file_type ) training_loader = DataLoader( training_set, @@ -221,30 +221,11 @@ def fit( ) val_loader = None if val_set is not None: - if isinstance(val_set, str): - with h5py.File(val_set, "r") as hf: - # Here we read the whole validation set from the file to mask a portion for validation. - # In PyPOTS, using a file usually because the data is too big. However, the validation set is - # generally shouldn't be too large. For example, we have 1 billion samples for model training. - # We won't take 20% of them as the validation set because we want as much as possible data for the - # training stage to enhance the model's generalization ability. Therefore, 100,000 representative - # samples will be enough to validate the model. - val_set = { - "X": hf["X"][:], - "X_intact": hf["X_intact"][:], - "indicating_mask": hf["indicating_mask"][:], - } - - # check if X_intact contains missing values - if np.isnan(val_set["X_intact"]).any(): - val_set["X_intact"] = np.nan_to_num(val_set["X_intact"], nan=0) - logger.warning( - "X_intact shouldn't contain missing data but has NaN values. " - "PyPOTS has imputed them with zeros by default to start the training for now. " - "Please double-check your data if you have concerns over this operation." - ) - - val_set = BaseDataset(val_set, return_labels=False, file_type=file_type) + if not check_X_ori_in_val_set(val_set): + raise ValueError("val_set must contain 'X_ori' for model validation.") + val_set = DatasetForTimesNet( + val_set, return_X_ori=True, return_labels=False, file_type=file_type + ) val_loader = DataLoader( val_set, batch_size=self.batch_size, @@ -289,7 +270,9 @@ def predict( """ # Step 1: wrap the input data with classes Dataset and DataLoader self.model.eval() # set the model as eval status to freeze it. - test_set = BaseDataset(test_set, return_labels=False, file_type=file_type) + test_set = BaseDataset( + test_set, return_X_ori=False, return_labels=False, file_type=file_type + ) test_loader = DataLoader( test_set, batch_size=self.batch_size, diff --git a/pypots/imputation/timesnet/modules/core.py b/pypots/imputation/timesnet/modules/core.py index 87dea28c..fd2e4cf0 100644 --- a/pypots/imputation/timesnet/modules/core.py +++ b/pypots/imputation/timesnet/modules/core.py @@ -77,7 +77,7 @@ def forward(self, inputs: dict, training: bool = True) -> dict: if training: # `loss` is always the item for backward propagating to update the model - loss = calc_mse(dec_out, inputs["X_intact"], inputs["indicating_mask"]) + loss = calc_mse(dec_out, inputs["X_ori"], inputs["indicating_mask"]) results["loss"] = loss return results diff --git a/pypots/imputation/transformer/data.py b/pypots/imputation/transformer/data.py index 0409a22c..6974991d 100644 --- a/pypots/imputation/transformer/data.py +++ b/pypots/imputation/transformer/data.py @@ -14,8 +14,9 @@ class DatasetForTransformer(DatasetForSAITS): def __init__( self, data: Union[dict, str], - return_labels: bool = True, + return_X_ori: bool, + return_labels: bool, file_type: str = "h5py", rate: float = 0.2, ): - super().__init__(data, return_labels, file_type, rate) + super().__init__(data, return_X_ori, return_labels, file_type, rate) diff --git a/pypots/imputation/transformer/model.py b/pypots/imputation/transformer/model.py index cfe0b009..3205aa91 100644 --- a/pypots/imputation/transformer/model.py +++ b/pypots/imputation/transformer/model.py @@ -15,7 +15,6 @@ from typing import Union, Optional -import h5py import numpy as np import torch from torch.utils.data import DataLoader @@ -24,6 +23,7 @@ from .modules import _TransformerEncoder from ..base import BaseNNImputer from ...data.base import BaseDataset +from ...data.checking import check_X_ori_in_val_set from ...optim.adam import Adam from ...optim.base import Optimizer from ...utils.logging import logger @@ -200,25 +200,28 @@ def __init__( self.optimizer = optimizer self.optimizer.init_optimizer(self.model.parameters()) - def _assemble_input_for_training(self, data: dict) -> dict: + def _assemble_input_for_training(self, data: list) -> dict: ( indices, - X_intact, X, missing_mask, + X_ori, indicating_mask, ) = self._send_data_to_given_device(data) inputs = { "X": X, - "X_intact": X_intact, "missing_mask": missing_mask, + "X_ori": X_ori, "indicating_mask": indicating_mask, } return inputs def _assemble_input_for_validating(self, data: list) -> dict: + return self._assemble_input_for_training(data) + + def _assemble_input_for_testing(self, data: list) -> dict: indices, X, missing_mask = self._send_data_to_given_device(data) inputs = { @@ -228,9 +231,6 @@ def _assemble_input_for_validating(self, data: list) -> dict: return inputs - def _assemble_input_for_testing(self, data: list) -> dict: - return self._assemble_input_for_validating(data) - def fit( self, train_set: Union[dict, str], @@ -239,7 +239,7 @@ def fit( ) -> None: # Step 1: wrap the input data with classes Dataset and DataLoader training_set = DatasetForTransformer( - train_set, return_labels=False, file_type=file_type + train_set, return_X_ori=False, return_labels=False, file_type=file_type ) training_loader = DataLoader( training_set, @@ -249,30 +249,11 @@ def fit( ) val_loader = None if val_set is not None: - if isinstance(val_set, str): - with h5py.File(val_set, "r") as hf: - # Here we read the whole validation set from the file to mask a portion for validation. - # In PyPOTS, using a file usually because the data is too big. However, the validation set is - # generally shouldn't be too large. For example, we have 1 billion samples for model training. - # We won't take 20% of them as the validation set because we want as much as possible data for the - # training stage to enhance the model's generalization ability. Therefore, 100,000 representative - # samples will be enough to validate the model. - val_set = { - "X": hf["X"][:], - "X_intact": hf["X_intact"][:], - "indicating_mask": hf["indicating_mask"][:], - } - - # check if X_intact contains missing values - if np.isnan(val_set["X_intact"]).any(): - val_set["X_intact"] = np.nan_to_num(val_set["X_intact"], nan=0) - logger.warning( - "X_intact shouldn't contain missing data but has NaN values. " - "PyPOTS has imputed them with zeros by default to start the training for now. " - "Please double-check your data if you have concerns over this operation." - ) - - val_set = BaseDataset(val_set, return_labels=False, file_type=file_type) + if not check_X_ori_in_val_set(val_set): + raise ValueError("val_set must contain 'X_ori' for model validation.") + val_set = DatasetForTransformer( + val_set, return_X_ori=True, return_labels=False, file_type=file_type + ) val_loader = DataLoader( val_set, batch_size=self.batch_size, @@ -294,7 +275,9 @@ def predict( file_type: str = "h5py", ) -> dict: self.model.eval() # set the model as eval status to freeze it. - test_set = BaseDataset(test_set, return_labels=False, file_type=file_type) + test_set = BaseDataset( + test_set, return_X_ori=False, return_labels=False, file_type=file_type + ) test_loader = DataLoader( test_set, batch_size=self.batch_size, diff --git a/pypots/imputation/transformer/modules/core.py b/pypots/imputation/transformer/modules/core.py index 682d5bd2..4712c5bb 100644 --- a/pypots/imputation/transformer/modules/core.py +++ b/pypots/imputation/transformer/modules/core.py @@ -91,7 +91,7 @@ def forward(self, inputs: dict, training: bool = True) -> dict: if training: ORT_loss = calc_mae(learned_presentation, X, masks) MIT_loss = calc_mae( - learned_presentation, inputs["X_intact"], inputs["indicating_mask"] + learned_presentation, inputs["X_ori"], inputs["indicating_mask"] ) results["ORT_loss"] = ORT_loss results["MIT_loss"] = MIT_loss diff --git a/pypots/imputation/usgan/data.py b/pypots/imputation/usgan/data.py index 3a4b1637..58e035c3 100644 --- a/pypots/imputation/usgan/data.py +++ b/pypots/imputation/usgan/data.py @@ -40,7 +40,8 @@ class DatasetForUSGAN(DatasetForBRITS): def __init__( self, data: Union[dict, str], - return_labels: bool = True, + return_X_ori: bool, + return_labels: bool, file_type: str = "h5py", ): - super().__init__(data, return_labels, file_type) + super().__init__(data, return_X_ori, return_labels, file_type) diff --git a/pypots/imputation/usgan/model.py b/pypots/imputation/usgan/model.py index 96d84bb7..287958b4 100644 --- a/pypots/imputation/usgan/model.py +++ b/pypots/imputation/usgan/model.py @@ -12,7 +12,6 @@ import os from typing import Union, Optional -import h5py import numpy as np import torch from torch.utils.data import DataLoader @@ -20,6 +19,7 @@ from .data import DatasetForUSGAN from .modules import _USGAN from ..base import BaseNNImputer +from ...data.checking import check_X_ori_in_val_set from ...optim.adam import Adam from ...optim.base import Optimizer from ...utils.logging import logger @@ -195,10 +195,40 @@ def _assemble_input_for_training(self, data: list) -> dict: return inputs def _assemble_input_for_validating(self, data: list) -> dict: - return self._assemble_input_for_training(data) + # fetch data + ( + indices, + X, + missing_mask, + deltas, + back_X, + back_missing_mask, + back_deltas, + X_ori, + indicating_mask, + ) = self._send_data_to_given_device(data) + + # assemble input data + inputs = { + "indices": indices, + "forward": { + "X": X, + "missing_mask": missing_mask, + "deltas": deltas, + }, + "backward": { + "X": back_X, + "missing_mask": back_missing_mask, + "deltas": back_deltas, + }, + "X_ori": X_ori, + "indicating_mask": indicating_mask, + } + + return inputs def _assemble_input_for_testing(self, data: list) -> dict: - return self._assemble_input_for_validating(data) + return self._assemble_input_for_training(data) def _train_model( self, @@ -350,7 +380,7 @@ def fit( ) -> None: # Step 1: wrap the input data with classes Dataset and DataLoader training_set = DatasetForUSGAN( - train_set, return_labels=False, file_type=file_type + train_set, return_X_ori=False, return_labels=False, file_type=file_type ) training_loader = DataLoader( training_set, @@ -360,30 +390,11 @@ def fit( ) val_loader = None if val_set is not None: - if isinstance(val_set, str): - with h5py.File(val_set, "r") as hf: - # Here we read the whole validation set from the file to mask a portion for validation. - # In PyPOTS, using a file usually because the data is too big. However, the validation set is - # generally shouldn't be too large. For example, we have 1 billion samples for model training. - # We won't take 20% of them as the validation set because we want as much as possible data for the - # training stage to enhance the model's generalization ability. Therefore, 100,000 representative - # samples will be enough to validate the model. - val_set = { - "X": hf["X"][:], - "X_intact": hf["X_intact"][:], - "indicating_mask": hf["indicating_mask"][:], - } - - # check if X_intact contains missing values - if np.isnan(val_set["X_intact"]).any(): - val_set["X_intact"] = np.nan_to_num(val_set["X_intact"], nan=0) - logger.warning( - "X_intact shouldn't contain missing data but has NaN values. " - "PyPOTS has imputed them with zeros by default to start the training for now. " - "Please double-check your data if you have concerns over this operation." - ) - - val_set = DatasetForUSGAN(val_set, return_labels=False, file_type=file_type) + if not check_X_ori_in_val_set(val_set): + raise ValueError("val_set must contain 'X_ori' for model validation.") + val_set = DatasetForUSGAN( + val_set, return_X_ori=True, return_labels=False, file_type=file_type + ) val_loader = DataLoader( val_set, batch_size=self.batch_size, @@ -405,7 +416,9 @@ def predict( file_type="h5py", ) -> dict: self.model.eval() # set the model as eval status to freeze it. - test_set = DatasetForUSGAN(test_set, return_labels=False, file_type=file_type) + test_set = DatasetForUSGAN( + test_set, return_X_ori=False, return_labels=False, file_type=file_type + ) test_loader = DataLoader( test_set, batch_size=self.batch_size, diff --git a/pypots/utils/visual/__init__.py b/pypots/utils/visual/__init__.py new file mode 100644 index 00000000..39b029fd --- /dev/null +++ b/pypots/utils/visual/__init__.py @@ -0,0 +1,11 @@ +""" +Visualization utilities. +""" + +# Created by Wenjie Du +# License: BSD-3-Clause + +__all__ = [ + "clustering", + "data", +] diff --git a/pypots/utils/visual/data.py b/pypots/utils/visual/data.py index ada81ff2..2e86b607 100644 --- a/pypots/utils/visual/data.py +++ b/pypots/utils/visual/data.py @@ -5,19 +5,23 @@ # Created by Jun Wang and Wenjie Du # License: BSD-3-Clause +from typing import Optional + import matplotlib.pyplot as plt import numpy as np import pandas as pd +from ..logging import logger + def plot_data( - vals_obs, - vals_eval, - vals_imputed, - dataidx: int = None, - nrows: int = 10, - ncols: int = 4, - figsize=None, + X: np.ndarray, + X_ori: np.ndarray, + X_imputed: np.ndarray, + sample_idx: Optional[int] = None, + n_rows: int = 10, + n_cols: int = 4, + fig_size: Optional[list] = None, ): """Plot the imputed values, the observed values, and the evaluated values of one multivariate timeseries. The observed values are marked with red 'x', the evaluated values are marked with blue 'o', @@ -25,48 +29,58 @@ def plot_data( Parameters ---------- - vals_obs : ndarray, + X : ndarray, The observed values - vals_eval : ndarray, + X_ori : ndarray, The evaluated values - vals_imputed : ndarray, + X_imputed : ndarray, The imputed values - dataidx : int, - The index of the sample to be plotted + sample_idx : int, + The index of the sample to be plotted. + If None, a randomly-selected sample will be plotted for visualization. - nrows : int, + n_rows : int, The number of rows in the plot - ncols : int, + n_cols : int, The number of columns in the plot - figsize : list, + fig_size : list, The size of the figure """ - n_samples, n_steps, n_features = vals_obs.shape + vals_shape = X.shape + assert ( + len(vals_shape) == 3 + ), "vals_obs should be a 3D array of shape (n_samples, n_steps, n_features)" + n_samples, n_steps, n_features = vals_shape - if dataidx is None: - dataidx = np.random.randint(low=0, high=n_samples) - if figsize is None: - figsize = [24, 36] + if sample_idx is None: + sample_idx = np.random.randint(low=0, high=n_samples) + logger.warning( + f"No sample index is specified, a random sample {sample_idx} is selected for visualization." + ) - n_k = nrows * ncols + if fig_size is None: + fig_size = [24, 36] + + n_k = n_rows * n_cols K = np.min([n_features, n_k]) L = n_steps plt.rcParams["font.size"] = 16 - fig, axes = plt.subplots(nrows=nrows, ncols=ncols, figsize=(figsize[0], figsize[1])) - # fig.delaxes(axes[-1][-1]) + fig, axes = plt.subplots( + nrows=n_rows, ncols=n_cols, figsize=(fig_size[0], fig_size[1]) + ) for k in range(K): - df = pd.DataFrame({"x": np.arange(0, L), "val": vals_imputed[dataidx, :, k]}) - df1 = pd.DataFrame({"x": np.arange(0, L), "val": vals_obs[dataidx, :, k]}) - df2 = pd.DataFrame({"x": np.arange(0, L), "val": vals_eval[dataidx, :, k]}) - row = k // ncols - col = k % ncols + df = pd.DataFrame({"x": np.arange(0, L), "val": X_imputed[sample_idx, :, k]}) + df1 = pd.DataFrame({"x": np.arange(0, L), "val": X[sample_idx, :, k]}) + df2 = pd.DataFrame({"x": np.arange(0, L), "val": X_ori[sample_idx, :, k]}) + row = k // n_cols + col = k % n_cols axes[row][col].plot(df1.x, df1.val, color="r", marker="x", linestyle="None") axes[row][col].plot(df2.x, df2.val, color="b", marker="o", linestyle="None") axes[row][col].plot(df.x, df.val, color="g", linestyle="solid") @@ -74,58 +88,80 @@ def plot_data( plt.setp(axes[row, 0], ylabel="value") if row == -1: plt.setp(axes[-1, col], xlabel="time") + plt.show() -def plot_missingness(mask, t_max=1, t_min=0, dataidx=None): +def plot_missingness( + missing_mask, + min_step=0, + max_step=1, + sample_idx: Optional[int] = None, +): """Plot the missingness pattern of one multivariate timeseries. For each feature, the observed timestamp is marked with blue '|'. The distribution of sequence lengths is also plotted. Hereby, the sequence length is defined as the number of observed timestamps in one feature. Parameters ---------- - mask : ndarray, - The mask matrix of one multivariate timeseries + missing_mask : ndarray, + The missing mask of multivariate time series. - t_max : int, - The maximum time + min_step : int, + The minimum time step for visualization. - t_min : int, - The minimum time + max_step : int, + The maximum time step for visualization. - dataidx : int, - The index of the sample to be plotted + sample_idx : int, + The index of the sample to be plotted, if None, a randomly-selected sample will be plotted for visualization. """ - n_s, n_l, n_c = mask.shape + mask_shape = missing_mask.shape + if len(mask_shape) == 3: + n_samples, n_steps, n_features = missing_mask.shape + if sample_idx is None: + sample_idx = np.random.randint(low=0, high=n_samples) + logger.warning( + f"No sample index is specified, a random sample {sample_idx} is selected for visualization." + ) + mask_sample_for_vis = np.transpose(missing_mask[sample_idx], (1, 0)) + elif len(mask_shape) == 2: + n_steps, n_features = missing_mask.shape + mask_sample_for_vis = np.transpose(missing_mask, (1, 0)) + else: + raise ValueError( + f"missing_mask should be missing masks of multiple time series samples of " + f"shape (n_samples, n_steps, n_features), " + f"or of a single time series sample of shape (n_steps, n_features). " + f"But got invalid shape of missing_mask: {mask_shape}." + ) + time = np.repeat( - np.repeat(np.linspace(0, t_max, n_l).reshape(1, n_l, 1), axis=2, repeats=n_c), - axis=0, - repeats=n_s, + np.linspace(0, max_step, n_steps).reshape(n_steps, 1), + axis=1, + repeats=n_features, ) - if dataidx is None: - dataidx = np.random.randint(low=0, high=n_s) + plot_sample = np.transpose(time, (1, 0)) fig, axes = plt.subplots(figsize=[12, 3.5], dpi=200, nrows=1, ncols=2) plt.subplots_adjust(hspace=0.1) seq_len = [] - sample = np.transpose(time[dataidx], (1, 0)) - mask_s = np.transpose(mask[dataidx], (1, 0)) - for feature_idx in range(n_c): - t = sample[feature_idx][mask_s[feature_idx] == 1] - axes[0].scatter(t, np.ones_like(t) * (feature_idx), alpha=1, c="C0", marker="|") + for feature_idx in range(n_features): + t = plot_sample[feature_idx][mask_sample_for_vis[feature_idx] == 1] + axes[0].scatter(t, np.ones_like(t) * feature_idx, alpha=1, c="C0", marker="|") seq_len.append(len(t)) axes[0].set_title("Visualization of arrival times", fontsize=9) axes[0].set_xlabel("Time", fontsize=7) axes[0].set_ylabel("Features #", fontsize=7) - axes[0].set_xlim(-1, n_l) - # axes[0].set_ylim(0, n_c-1) + axes[0].set_xlim(-1, n_steps) + # axes[0].set_ylim(0, n_features-1) axes[0].tick_params(axis="both", labelsize=7) axes[1].set_title("Distribution of sequence lengths", fontsize=9) axes[1].hist( seq_len, - bins=n_l, + bins=n_steps, color="C1", - range=(t_min, t_max), + range=(min_step, max_step), ) axes[1].set_xlabel(r"Sequence length", fontsize=7) axes[1].set_ylabel("Frequency", fontsize=7) diff --git a/requirements.txt b/requirements.txt index 0cbb7491..0523e1ba 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,10 +3,11 @@ numpy scikit-learn +matplotlib scipy torch>=1.10.0 tensorboard pandas<2.0.0 -pygrinder>=0.2 +pygrinder>=0.4 tsdb>=0.2 h5py diff --git a/setup.cfg b/setup.cfg index cce7fe81..bbc3f761 100644 --- a/setup.cfg +++ b/setup.cfg @@ -27,13 +27,14 @@ exclude = pypots/*/template basic = numpy scikit-learn + matplotlib pandas<2.0.0 torch>=1.10.0 tensorboard scipy h5py tsdb>=0.2 - pygrinder>=0.2 + pygrinder>=0.4 # dependencies that are optional, torch-geometric are only needed for model Raindrop # but its installation takes too much time diff --git a/setup.py b/setup.py index 75b95445..4b42b34e 100644 --- a/setup.py +++ b/setup.py @@ -46,11 +46,12 @@ install_requires=[ "numpy", "scikit-learn", + "matplotlib", "scipy", "torch>=1.10.0", "tensorboard", "pandas<2.0.0", - "pygrinder>=0.2", + "pygrinder>=0.4", "tsdb>=0.2", "h5py", ], diff --git a/tests/classification/brits.py b/tests/classification/brits.py index c7815f5c..a68442c0 100644 --- a/tests/classification/brits.py +++ b/tests/classification/brits.py @@ -14,16 +14,17 @@ from pypots.optim import Adam from pypots.utils.logging import logger from pypots.utils.metrics import calc_binary_classification_metrics -from tests.classification.config import ( +from tests.global_test_config import ( + DATA, EPOCHS, + DEVICE, TRAIN_SET, VAL_SET, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, RESULT_SAVING_DIR_FOR_CLASSIFICATION, -) -from tests.global_test_config import ( - DATA, - DEVICE, check_tb_and_model_checkpoints_existence, ) @@ -57,14 +58,16 @@ def test_0_fit(self): @pytest.mark.xdist_group(name="classification-brits") def test_1_classify(self): - predictions = self.brits.classify(TEST_SET) - metrics = calc_binary_classification_metrics(predictions, DATA["test_y"]) + results = self.brits.predict(TEST_SET) + metrics = calc_binary_classification_metrics( + results["classification"], DATA["test_y"] + ) logger.info( - f'ROC_AUC: {metrics["roc_auc"]}, \n' - f'PR_AUC: {metrics["pr_auc"]},\n' - f'F1: {metrics["f1"]},\n' - f'Precision: {metrics["precision"]},\n' - f'Recall: {metrics["recall"]},\n' + f'BRITS ROC_AUC: {metrics["roc_auc"]}, ' + f'PR_AUC: {metrics["pr_auc"]}, ' + f'F1: {metrics["f1"]}, ' + f'Precision: {metrics["precision"]}, ' + f'Recall: {metrics["recall"]}' ) assert metrics["roc_auc"] >= 0.5, "ROC-AUC < 0.5" @@ -99,6 +102,22 @@ def test_3_saving_path(self): # test loading the saved model, not necessary, but need to test self.brits.load(saved_model_path) + @pytest.mark.xdist_group(name="classification-brits") + def test_4_lazy_loading(self): + self.brits.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + results = self.brits.predict(H5_TEST_SET_PATH) + metrics = calc_binary_classification_metrics( + results["classification"], DATA["test_y"] + ) + logger.info( + f'Lazy-loading BRITS ROC_AUC: {metrics["roc_auc"]}, ' + f'PR_AUC: {metrics["pr_auc"]}, ' + f'F1: {metrics["f1"]}, ' + f'Precision: {metrics["precision"]}, ' + f'Recall: {metrics["recall"]}' + ) + assert metrics["roc_auc"] >= 0.5, "ROC-AUC < 0.5" + if __name__ == "__main__": unittest.main() diff --git a/tests/classification/config.py b/tests/classification/config.py deleted file mode 100644 index 727ff405..00000000 --- a/tests/classification/config.py +++ /dev/null @@ -1,21 +0,0 @@ -""" -Test configs for classification models. -""" - -# Created by Wenjie Du -# License: BSD-3-Clause - -import os - -from tests.global_test_config import ( - DATA, - RESULT_SAVING_DIR, -) - -EPOCHS = 5 - -TRAIN_SET = {"X": DATA["train_X"], "y": DATA["train_y"]} -VAL_SET = {"X": DATA["val_X"], "y": DATA["val_y"]} -TEST_SET = {"X": DATA["test_X"]} - -RESULT_SAVING_DIR_FOR_CLASSIFICATION = os.path.join(RESULT_SAVING_DIR, "classification") diff --git a/tests/classification/grud.py b/tests/classification/grud.py index 37bad931..756451d4 100644 --- a/tests/classification/grud.py +++ b/tests/classification/grud.py @@ -14,16 +14,17 @@ from pypots.optim import Adam from pypots.utils.logging import logger from pypots.utils.metrics import calc_binary_classification_metrics -from tests.classification.config import ( +from tests.global_test_config import ( + DATA, EPOCHS, + DEVICE, TRAIN_SET, VAL_SET, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, RESULT_SAVING_DIR_FOR_CLASSIFICATION, -) -from tests.global_test_config import ( - DATA, - DEVICE, check_tb_and_model_checkpoints_existence, ) @@ -59,11 +60,11 @@ def test_1_classify(self): predictions = self.grud.classify(TEST_SET) metrics = calc_binary_classification_metrics(predictions, DATA["test_y"]) logger.info( - f'ROC_AUC: {metrics["roc_auc"]}, \n' - f'PR_AUC: {metrics["pr_auc"]},\n' - f'F1: {metrics["f1"]},\n' - f'Precision: {metrics["precision"]},\n' - f'Recall: {metrics["recall"]},\n' + f'GRU-D ROC_AUC: {metrics["roc_auc"]}, ' + f'PR_AUC: {metrics["pr_auc"]}, ' + f'F1: {metrics["f1"]}, ' + f'Precision: {metrics["precision"]}, ' + f'Recall: {metrics["recall"]}' ) assert metrics["roc_auc"] >= 0.5, "ROC-AUC < 0.5" @@ -98,6 +99,22 @@ def test_3_saving_path(self): # test loading the saved model, not necessary, but need to test self.grud.load(saved_model_path) + @pytest.mark.xdist_group(name="classification-grud") + def test_4_lazy_loading(self): + self.grud.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + results = self.grud.predict(H5_TEST_SET_PATH) + metrics = calc_binary_classification_metrics( + results["classification"], DATA["test_y"] + ) + logger.info( + f'GRU-D ROC_AUC: {metrics["roc_auc"]}, ' + f'PR_AUC: {metrics["pr_auc"]}, ' + f'F1: {metrics["f1"]}, ' + f'Precision: {metrics["precision"]}, ' + f'Recall: {metrics["recall"]}' + ) + assert metrics["roc_auc"] >= 0.5, "ROC-AUC < 0.5" + if __name__ == "__main__": unittest.main() diff --git a/tests/classification/raindrop.py b/tests/classification/raindrop.py index 967f73ec..530a13d7 100644 --- a/tests/classification/raindrop.py +++ b/tests/classification/raindrop.py @@ -13,16 +13,17 @@ from pypots.classification import Raindrop from pypots.utils.logging import logger from pypots.utils.metrics import calc_binary_classification_metrics -from tests.classification.config import ( +from tests.global_test_config import ( + DATA, EPOCHS, + DEVICE, TRAIN_SET, VAL_SET, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, RESULT_SAVING_DIR_FOR_CLASSIFICATION, -) -from tests.global_test_config import ( - DATA, - DEVICE, check_tb_and_model_checkpoints_existence, ) @@ -62,11 +63,11 @@ def test_1_classify(self): predictions = self.raindrop.classify(TEST_SET) metrics = calc_binary_classification_metrics(predictions, DATA["test_y"]) logger.info( - f'ROC_AUC: {metrics["roc_auc"]}, \n' - f'PR_AUC: {metrics["pr_auc"]},\n' - f'F1: {metrics["f1"]},\n' - f'Precision: {metrics["precision"]},\n' - f'Recall: {metrics["recall"]},\n' + f'Lazy-loading Raindrop ROC_AUC: {metrics["roc_auc"]}, ' + f'PR_AUC: {metrics["pr_auc"]}, ' + f'F1: {metrics["f1"]}, ' + f'Precision: {metrics["precision"]}, ' + f'Recall: {metrics["recall"]}' ) assert metrics["roc_auc"] >= 0.5, "ROC-AUC < 0.5" @@ -103,6 +104,22 @@ def test_3_saving_path(self): # test loading the saved model, not necessary, but need to test self.raindrop.load(saved_model_path) + @pytest.mark.xdist_group(name="classification-raindrop") + def test_4_lazy_loading(self): + self.raindrop.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + results = self.raindrop.predict(H5_TEST_SET_PATH) + metrics = calc_binary_classification_metrics( + results["classification"], DATA["test_y"] + ) + logger.info( + f'Lazy-loading Raindrop ROC_AUC: {metrics["roc_auc"]}, ' + f'PR_AUC: {metrics["pr_auc"]}, ' + f'F1: {metrics["f1"]}, ' + f'Precision: {metrics["precision"]}, ' + f'Recall: {metrics["recall"]}' + ) + assert metrics["roc_auc"] >= 0.5, "ROC-AUC < 0.5" + if __name__ == "__main__": unittest.main() diff --git a/tests/clustering/config.py b/tests/clustering/config.py deleted file mode 100644 index 0d07d56a..00000000 --- a/tests/clustering/config.py +++ /dev/null @@ -1,22 +0,0 @@ -""" -Test configs for clustering models. -""" - -# Created by Wenjie Du -# License: BSD-3-Clause - -import os - -from tests.global_test_config import ( - DATA, - RESULT_SAVING_DIR, -) - - -EPOCHS = 5 - -TRAIN_SET = {"X": DATA["train_X"]} -VAL_SET = {"X": DATA["val_X"]} -TEST_SET = {"X": DATA["test_X"]} - -RESULT_SAVING_DIR_FOR_CLUSTERING = os.path.join(RESULT_SAVING_DIR, "clustering") diff --git a/tests/clustering/crli.py b/tests/clustering/crli.py index 63960619..6a36d670 100644 --- a/tests/clustering/crli.py +++ b/tests/clustering/crli.py @@ -18,16 +18,17 @@ calc_external_cluster_validation_metrics, calc_internal_cluster_validation_metrics, ) -from tests.clustering.config import ( +from tests.global_test_config import ( + DATA, EPOCHS, + DEVICE, TRAIN_SET, VAL_SET, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, RESULT_SAVING_DIR_FOR_CLUSTERING, -) -from tests.global_test_config import ( - DATA, - DEVICE, check_tb_and_model_checkpoints_existence, ) @@ -131,8 +132,8 @@ def test_2_cluster(self): internal_metrics = calc_internal_cluster_validation_metrics( clustering_results["latent_vars"]["clustering_latent"], DATA["test_y"] ) - logger.info(f"CRLI-GRU: {external_metrics}") - logger.info(f"CRLI-GRU:{internal_metrics}") + logger.info(f"CRLI-GRU external_metrics: {external_metrics}") + logger.info(f"CRLI-GRU internal_metrics: {internal_metrics}") # LSTM cell clustering_results = self.crli_lstm.predict(TEST_SET, return_latent_vars=True) @@ -142,8 +143,8 @@ def test_2_cluster(self): internal_metrics = calc_internal_cluster_validation_metrics( clustering_results["latent_vars"]["clustering_latent"], DATA["test_y"] ) - logger.info(f"CRLI-LSTM: {external_metrics}") - logger.info(f"CRLI-LSTM: {internal_metrics}") + logger.info(f"CRLI-LSTM external_metrics: {external_metrics}") + logger.info(f"CRLI-LSTM internal_metrics: {internal_metrics}") @pytest.mark.xdist_group(name="clustering-crli") def test_3_saving_path(self): @@ -162,6 +163,21 @@ def test_3_saving_path(self): # test loading the saved model, not necessary, but need to test self.crli_gru.load(saved_model_path) + @pytest.mark.xdist_group(name="clustering-crli") + def test_4_lazy_loading(self): + self.crli_gru.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + clustering_results = self.crli_gru.predict( + H5_TEST_SET_PATH, return_latent_vars=True + ) + external_metrics = calc_external_cluster_validation_metrics( + clustering_results["clustering"], DATA["test_y"] + ) + internal_metrics = calc_internal_cluster_validation_metrics( + clustering_results["latent_vars"]["clustering_latent"], DATA["test_y"] + ) + logger.info(f"Lazy-loading CRLI-GRU external_metrics: {external_metrics}") + logger.info(f"Lazy-loading CRLI-GRU internal_metrics: {internal_metrics}") + if __name__ == "__main__": unittest.main() diff --git a/tests/clustering/vader.py b/tests/clustering/vader.py index d5143367..65d3ac34 100644 --- a/tests/clustering/vader.py +++ b/tests/clustering/vader.py @@ -19,16 +19,17 @@ calc_external_cluster_validation_metrics, calc_internal_cluster_validation_metrics, ) -from tests.clustering.config import ( +from tests.global_test_config import ( + DATA, EPOCHS, + DEVICE, TRAIN_SET, VAL_SET, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, RESULT_SAVING_DIR_FOR_CLUSTERING, -) -from tests.global_test_config import ( - DATA, - DEVICE, check_tb_and_model_checkpoints_existence, ) @@ -71,8 +72,8 @@ def test_1_cluster(self): internal_metrics = calc_internal_cluster_validation_metrics( clustering_results["latent_vars"]["z"], DATA["test_y"] ) - logger.info(f"{external_metrics}") - logger.info(f"{internal_metrics}") + logger.info(f"VaDER external_metrics: {external_metrics}") + logger.info(f"VaDER internal_metrics: {internal_metrics}") except np.linalg.LinAlgError as e: logger.error( f"{e}\n" @@ -110,6 +111,21 @@ def test_3_saving_path(self): # test loading the saved model, not necessary, but need to test self.vader.load(saved_model_path) + @pytest.mark.xdist_group(name="clustering-vader") + def test_4_lazy_loading(self): + self.vader.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + clustering_results = self.vader.predict( + H5_TEST_SET_PATH, return_latent_vars=True + ) + external_metrics = calc_external_cluster_validation_metrics( + clustering_results["clustering"], DATA["test_y"] + ) + internal_metrics = calc_internal_cluster_validation_metrics( + clustering_results["latent_vars"]["z"], DATA["test_y"] + ) + logger.info(f"Lazy-loading VaDER external_metrics: {external_metrics}") + logger.info(f"Lazy-loading VaDER internal_metrics: {internal_metrics}") + if __name__ == "__main__": unittest.main() diff --git a/tests/data/lazy_loading_strategy.py b/tests/data/lazy_loading_strategy.py deleted file mode 100644 index 43379dd6..00000000 --- a/tests/data/lazy_loading_strategy.py +++ /dev/null @@ -1,133 +0,0 @@ -""" -Test cases for data classes with the lazy-loading strategy of reading from files. -""" - -# Created by Wenjie Du -# License: BSD-3-Clause - -import os -import unittest - -import pytest - -from pypots.classification import BRITS, GRUD -from pypots.data.saving import save_dict_into_h5 -from pypots.imputation import SAITS -from pypots.utils.logging import logger -from tests.global_test_config import DATA, DATA_SAVING_DIR - -TRAIN_SET_NAME = "train_set.h5" -TRAIN_SET_PATH = f"{DATA_SAVING_DIR}/{TRAIN_SET_NAME}" -VAL_SET_NAME = "val_set.h5" -VAL_SET_PATH = f"{DATA_SAVING_DIR}/{VAL_SET_NAME}" -TEST_SET_NAME = "test_set.h5" -TEST_SET_PATH = f"{DATA_SAVING_DIR}/{TEST_SET_NAME}" -IMPUTATION_TRAIN_SET_NAME = "imputation_train_set.h5" -IMPUTATION_TRAIN_SET_PATH = f"{DATA_SAVING_DIR}/{IMPUTATION_TRAIN_SET_NAME}" -IMPUTATION_VAL_SET_NAME = "imputation_val_set.h5" -IMPUTATION_VAL_SET_PATH = f"{DATA_SAVING_DIR}/{IMPUTATION_VAL_SET_NAME}" - -EPOCHS = 1 - - -class TestLazyLoadingClasses(unittest.TestCase): - logger.info("Running tests for Dataset classes with lazy-loading strategy...") - - # initialize a SAITS model for testing DatasetForMIT and BaseDataset - saits = SAITS( - DATA["n_steps"], - DATA["n_features"], - n_layers=2, - d_model=256, - d_inner=128, - n_heads=4, - d_k=64, - d_v=64, - dropout=0.1, - epochs=EPOCHS, - ) - - # initialize a BRITS model for testing DatasetForBRITS - brits = BRITS( - DATA["n_steps"], - DATA["n_features"], - n_classes=DATA["n_classes"], - rnn_hidden_size=256, - epochs=EPOCHS, - ) - - # initialize a GRUD model for testing DatasetForGRUD - grud = GRUD( - DATA["n_steps"], - DATA["n_features"], - n_classes=DATA["n_classes"], - rnn_hidden_size=256, - epochs=EPOCHS, - ) - - @pytest.mark.xdist_group(name="data-lazy-loading") - def test_0_save_datasets_into_files(self): - # create the dir for saving files - os.makedirs(DATA_SAVING_DIR, exist_ok=True) - - if not os.path.exists(TRAIN_SET_PATH): - save_dict_into_h5( - {"X": DATA["train_X"], "y": DATA["train_y"].astype(float)}, - DATA_SAVING_DIR, - TRAIN_SET_NAME, - ) - - if not os.path.exists(VAL_SET_PATH): - save_dict_into_h5( - {"X": DATA["val_X"], "y": DATA["val_y"].astype(float)}, - DATA_SAVING_DIR, - VAL_SET_NAME, - ) - - if not os.path.exists(IMPUTATION_TRAIN_SET_PATH): - save_dict_into_h5( - {"X": DATA["train_X"]}, DATA_SAVING_DIR, IMPUTATION_TRAIN_SET_NAME - ) - - if not os.path.exists(IMPUTATION_VAL_SET_PATH): - save_dict_into_h5( - { - "X": DATA["val_X"], - "X_intact": DATA["val_X_intact"], - "indicating_mask": DATA["val_X_indicating_mask"], - }, - DATA_SAVING_DIR, - IMPUTATION_VAL_SET_NAME, - ) - - if not os.path.exists(TEST_SET_PATH): - save_dict_into_h5( - { - "X": DATA["test_X"], - "X_intact": DATA["test_X_intact"], - "indicating_mask": DATA["test_X_indicating_mask"], - }, - DATA_SAVING_DIR, - TEST_SET_NAME, - ) - - @pytest.mark.xdist_group(name="data-lazy-loading") - def test_1_DatasetForMIT_BaseDataset(self): - self.saits.fit( - train_set=IMPUTATION_TRAIN_SET_PATH, val_set=IMPUTATION_VAL_SET_PATH - ) - _ = self.saits.impute(X=TEST_SET_PATH) - - @pytest.mark.xdist_group(name="data-lazy-loading") - def test_2_DatasetForBRITS(self): - self.brits.fit(train_set=TRAIN_SET_PATH, val_set=VAL_SET_PATH) - _ = self.brits.classify(X=TEST_SET_PATH) - - @pytest.mark.xdist_group(name="data-lazy-loading") - def test_3_DatasetForGRUD(self): - self.grud.fit(train_set=TRAIN_SET_PATH, val_set=VAL_SET_PATH) - _ = self.grud.classify(X=TEST_SET_PATH) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/environment_for_conda_test.yml b/tests/environment_for_conda_test.yml index 2630fd28..2cb128d9 100644 --- a/tests/environment_for_conda_test.yml +++ b/tests/environment_for_conda_test.yml @@ -16,9 +16,10 @@ dependencies: - conda-forge::pandas <2.0.0 - conda-forge::h5py - conda-forge::tensorboard - - conda-forge::pygrinder >=0.2 + - conda-forge::pygrinder >=0.4 - conda-forge::tsdb >=0.2 - conda-forge::protobuf <=4.21.12 + - conda-forge::matplotlib - pytorch::pytorch >=1.10.0 # optional diff --git a/tests/forecasting/config.py b/tests/forecasting/config.py index 2e0a27bb..f3f8377b 100644 --- a/tests/forecasting/config.py +++ b/tests/forecasting/config.py @@ -5,12 +5,7 @@ # Created by Wenjie Du # License: BSD-3-Clause -import os - -from tests.global_test_config import ( - DATA, - RESULT_SAVING_DIR, -) +from tests.global_test_config import DATA EPOCHS = 5 N_PRED_STEP = 4 @@ -18,6 +13,4 @@ TRAIN_SET = {"X": DATA["train_X"]} VAL_SET = {"X": DATA["val_X"]} TEST_SET = {"X": DATA["test_X"][:, :-N_PRED_STEP]} -TEST_SET_INTACT = {"X": DATA["test_X_intact"]} - -RESULT_SAVING_DIR_FOR_CLASSIFICATION = os.path.join(RESULT_SAVING_DIR, "forecasting") +TEST_SET_INTACT = {"X": DATA["test_X_ori"]} diff --git a/tests/global_test_config.py b/tests/global_test_config.py index d9dd3b27..4b3ac41f 100644 --- a/tests/global_test_config.py +++ b/tests/global_test_config.py @@ -11,6 +11,7 @@ import torch from pypots.data.generating import gene_random_walk +from pypots.data.saving import save_dict_into_h5 from pypots.utils.logging import logger from pypots.utils.random import set_random_seed @@ -25,13 +26,32 @@ n_samples_each_class=1000, missing_rate=0.1, ) +# DATA = gene_physionet2012() -# The directory for saving the dataset into files for testing -DATA_SAVING_DIR = "h5data_for_tests" +TRAIN_SET = { + "X": DATA["train_X"], + "y": DATA["train_y"].astype(float), +} +VAL_SET = { + "X": DATA["val_X"], + "X_ori": DATA["val_X_ori"], + "y": DATA["val_y"].astype(float), +} +TEST_SET = { + "X": DATA["test_X"], + "X_ori": DATA["test_X_ori"], + "y": DATA["test_y"].astype(float), +} # tensorboard and model files saving directory RESULT_SAVING_DIR = "testing_results" +RESULT_SAVING_DIR_FOR_IMPUTATION = os.path.join(RESULT_SAVING_DIR, "imputation") +RESULT_SAVING_DIR_FOR_CLASSIFICATION = os.path.join(RESULT_SAVING_DIR, "classification") +RESULT_SAVING_DIR_FOR_CLUSTERING = os.path.join(RESULT_SAVING_DIR, "clustering") +RESULT_SAVING_DIR_FOR_FORECASTING = os.path.join(RESULT_SAVING_DIR, "forecasting") +# set the number of epochs for all model training +EPOCHS = 5 # set DEVICES to None if no cuda device is available, to avoid initialization failed while importing test classes n_cuda_devices = torch.cuda.device_count() @@ -45,6 +65,12 @@ # if having no multiple cuda devices, leave it as None to use the default device DEVICE = None +# save the generated dataset into files for testing the lazy-loading strategy +DATA_SAVING_DIR = "h5data_for_tests" +H5_TRAIN_SET_PATH = f"{DATA_SAVING_DIR}/train_set.h5" +H5_VAL_SET_PATH = f"{DATA_SAVING_DIR}/val_set.h5" +H5_TEST_SET_PATH = f"{DATA_SAVING_DIR}/test_set.h5" + def check_tb_and_model_checkpoints_existence(model): # check the tensorboard file existence @@ -57,3 +83,34 @@ def check_tb_and_model_checkpoints_existence(model): # check the model checkpoints existence saved_model_files = [i for i in saved_files if i.endswith(".pypots")] assert len(saved_model_files) > 0, "No model checkpoint saved." + + +if __name__ == "__main__": + if not os.path.exists(H5_TRAIN_SET_PATH): + save_dict_into_h5( + { + "X": DATA["train_X"], + "y": DATA["train_y"].astype(float), + }, + H5_TRAIN_SET_PATH, + ) + + if not os.path.exists(H5_VAL_SET_PATH): + save_dict_into_h5( + { + "X": DATA["val_X"], + "X_ori": DATA["val_X_ori"], + "y": DATA["val_y"].astype(float), + }, + H5_VAL_SET_PATH, + ) + + if not os.path.exists(H5_TEST_SET_PATH): + save_dict_into_h5( + { + "X": DATA["test_X"], + "X_ori": DATA["test_X_ori"], + "y": DATA["test_y"].astype(float), + }, + H5_TEST_SET_PATH, + ) diff --git a/tests/imputation/brits.py b/tests/imputation/brits.py index e5eb2cb7..36efcc7c 100644 --- a/tests/imputation/brits.py +++ b/tests/imputation/brits.py @@ -18,15 +18,16 @@ from pypots.utils.metrics import calc_mae from tests.global_test_config import ( DATA, + EPOCHS, DEVICE, - check_tb_and_model_checkpoints_existence, -) -from tests.imputation.config import ( TRAIN_SET, VAL_SET, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, RESULT_SAVING_DIR_FOR_IMPUTATION, - EPOCHS, + check_tb_and_model_checkpoints_existence, ) @@ -62,7 +63,7 @@ def test_1_impute(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"BRITS test_MAE: {test_MAE}") @@ -97,6 +98,21 @@ def test_3_saving_path(self): # test loading the saved model, not necessary, but need to test self.brits.load(saved_model_path) + @pytest.mark.xdist_group(name="imputation-brits") + def test_4_lazy_loading(self): + self.brits.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + imputation_results = self.brits.predict(H5_TEST_SET_PATH) + assert not np.isnan( + imputation_results["imputation"] + ).any(), "Output still has missing values after running impute()." + + test_MAE = calc_mae( + imputation_results["imputation"], + DATA["test_X_ori"], + DATA["test_X_indicating_mask"], + ) + logger.info(f"Lazy-loading BRITS test_MAE: {test_MAE}") + if __name__ == "__main__": unittest.main() diff --git a/tests/imputation/config.py b/tests/imputation/config.py deleted file mode 100644 index 26a20269..00000000 --- a/tests/imputation/config.py +++ /dev/null @@ -1,25 +0,0 @@ -""" -Test configs for imputation models. -""" - -# Created by Wenjie Du -# License: BSD-3-Clause - -import os - -from tests.global_test_config import ( - DATA, - RESULT_SAVING_DIR, -) - -EPOCHS = 5 - -TRAIN_SET = {"X": DATA["train_X"]} -VAL_SET = { - "X": DATA["val_X"], - "X_intact": DATA["val_X_intact"], - "indicating_mask": DATA["val_X_indicating_mask"], -} -TEST_SET = {"X": DATA["test_X"]} - -RESULT_SAVING_DIR_FOR_IMPUTATION = os.path.join(RESULT_SAVING_DIR, "imputation") diff --git a/tests/imputation/csdi.py b/tests/imputation/csdi.py index 0ccf1222..3dc88602 100644 --- a/tests/imputation/csdi.py +++ b/tests/imputation/csdi.py @@ -18,15 +18,16 @@ from pypots.utils.metrics import calc_mae, calc_quantile_crps from tests.global_test_config import ( DATA, + EPOCHS, DEVICE, - check_tb_and_model_checkpoints_existence, -) -from tests.imputation.config import ( TRAIN_SET, VAL_SET, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, RESULT_SAVING_DIR_FOR_IMPUTATION, - EPOCHS, + check_tb_and_model_checkpoints_existence, ) @@ -64,14 +65,14 @@ def test_0_fit(self): def test_1_impute(self): imputed_X = self.csdi.predict(TEST_SET)["imputation"] test_CRPS = calc_quantile_crps( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) imputed_X = imputed_X.mean(axis=1) # mean over sampling times assert not np.isnan( imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"CSDI test_MAE: {test_MAE}, test_CRPS: {test_CRPS}") @@ -106,6 +107,24 @@ def test_3_saving_path(self): # test loading the saved model, not necessary, but need to test self.csdi.load(saved_model_path) + @pytest.mark.xdist_group(name="imputation-csdi") + def test_4_lazy_loading(self): + self.csdi.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + imputation_results = self.csdi.predict(H5_TEST_SET_PATH) + imputed_X = imputation_results["imputation"] + test_CRPS = calc_quantile_crps( + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] + ) + imputed_X = imputed_X.mean(axis=1) # mean over sampling times + assert not np.isnan( + imputed_X + ).any(), "Output still has missing values after running impute()." + + test_MAE = calc_mae( + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] + ) + logger.info(f"Lazy-loading CSDI test_MAE: {test_MAE}, test_CRPS: {test_CRPS}") + if __name__ == "__main__": unittest.main() diff --git a/tests/imputation/gpvae.py b/tests/imputation/gpvae.py index b94bff37..167e0dfc 100644 --- a/tests/imputation/gpvae.py +++ b/tests/imputation/gpvae.py @@ -18,15 +18,16 @@ from pypots.utils.metrics import calc_mae from tests.global_test_config import ( DATA, + EPOCHS, DEVICE, - check_tb_and_model_checkpoints_existence, -) -from tests.imputation.config import ( TRAIN_SET, VAL_SET, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, RESULT_SAVING_DIR_FOR_IMPUTATION, - EPOCHS, + check_tb_and_model_checkpoints_existence, ) @@ -62,7 +63,7 @@ def test_1_impute(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"GP-VAE test_MAE: {test_MAE}") @@ -97,6 +98,21 @@ def test_3_saving_path(self): # test loading the saved model, not necessary, but need to test self.gp_vae.load(saved_model_path) + @pytest.mark.xdist_group(name="imputation-gpvae") + def test_4_lazy_loading(self): + self.gp_vae.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + imputation_results = self.gp_vae.predict(H5_TEST_SET_PATH) + assert not np.isnan( + imputation_results["imputation"] + ).any(), "Output still has missing values after running impute()." + + test_MAE = calc_mae( + imputation_results["imputation"], + DATA["test_X_ori"], + DATA["test_X_indicating_mask"], + ) + logger.info(f"Lazy-loading GP-VAE test_MAE: {test_MAE}") + if __name__ == "__main__": unittest.main() diff --git a/tests/imputation/locf.py b/tests/imputation/locf.py index b43b7414..626623b0 100644 --- a/tests/imputation/locf.py +++ b/tests/imputation/locf.py @@ -17,18 +17,20 @@ from pypots.utils.metrics import calc_mae from tests.global_test_config import ( DATA, -) -from tests.imputation.config import ( + DEVICE, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, ) class TestLOCF(unittest.TestCase): logger.info("Running tests for an imputation model LOCF...") - locf_zero = LOCF(first_step_imputation="zero") - locf_backward = LOCF(first_step_imputation="backward") - locf_mean = LOCF(first_step_imputation="mean") - locf_nan = LOCF(first_step_imputation="nan") + locf_zero = LOCF(first_step_imputation="zero", device=DEVICE) + locf_backward = LOCF(first_step_imputation="backward", device=DEVICE) + locf_mean = LOCF(first_step_imputation="mean", device=DEVICE) + locf_nan = LOCF(first_step_imputation="nan", device=DEVICE) @pytest.mark.xdist_group(name="imputation-locf") def test_0_impute(self): @@ -38,7 +40,7 @@ def test_0_impute(self): test_X_imputed_zero ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - test_X_imputed_zero, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + test_X_imputed_zero, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"LOCF (zero) test_MAE: {test_MAE}") @@ -48,7 +50,7 @@ def test_0_impute(self): ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( test_X_imputed_backward, - DATA["test_X_intact"], + DATA["test_X_ori"], DATA["test_X_indicating_mask"], ) logger.info(f"LOCF (backward) test_MAE: {test_MAE}") @@ -59,7 +61,7 @@ def test_0_impute(self): ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( test_X_imputed_mean, - DATA["test_X_intact"], + DATA["test_X_ori"], DATA["test_X_indicating_mask"], ) logger.info(f"LOCF (mean) test_MAE: {test_MAE}") @@ -71,7 +73,7 @@ def test_0_impute(self): # if input data is torch tensor X = torch.from_numpy(np.copy(TEST_SET["X"])) - test_X_intact = torch.from_numpy(np.copy(DATA["test_X_intact"])) + test_X_ori = torch.from_numpy(np.copy(DATA["test_X_ori"])) test_X_indicating_mask = torch.from_numpy( np.copy(DATA["test_X_indicating_mask"]) ) @@ -80,7 +82,7 @@ def test_0_impute(self): assert not torch.isnan( test_X_imputed_zero ).any(), "Output still has missing values after running impute()." - test_MAE = calc_mae(test_X_imputed_zero, test_X_intact, test_X_indicating_mask) + test_MAE = calc_mae(test_X_imputed_zero, test_X_ori, test_X_indicating_mask) logger.info(f"LOCF (zero) test_MAE: {test_MAE}") test_X_imputed_backward = self.locf_backward.predict({"X": X})["imputation"] @@ -89,7 +91,7 @@ def test_0_impute(self): ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( test_X_imputed_backward, - test_X_intact, + test_X_ori, test_X_indicating_mask, ) logger.info(f"LOCF (backward) test_MAE: {test_MAE}") @@ -100,7 +102,7 @@ def test_0_impute(self): ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( test_X_imputed_mean, - test_X_intact, + test_X_ori, test_X_indicating_mask, ) logger.info(f"LOCF (mean) test_MAE: {test_MAE}") @@ -110,6 +112,21 @@ def test_0_impute(self): assert num_of_missing > 0, "Output should have missing data but not." logger.info(f"LOCF (nan) still have {num_of_missing} missing values.") + @pytest.mark.xdist_group(name="imputation-locf") + def test_4_lazy_loading(self): + self.locf_backward.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + imputation_results = self.locf_backward.predict(H5_TEST_SET_PATH) + assert not np.isnan( + imputation_results["imputation"] + ).any(), "Output still has missing values after running impute()." + + test_MAE = calc_mae( + imputation_results["imputation"], + DATA["test_X_ori"], + DATA["test_X_indicating_mask"], + ) + logger.info(f"Lazy-loading LOCF test_MAE: {test_MAE}") + if __name__ == "__main__": unittest.main() diff --git a/tests/imputation/mrnn.py b/tests/imputation/mrnn.py index b3074f09..3649d27b 100644 --- a/tests/imputation/mrnn.py +++ b/tests/imputation/mrnn.py @@ -18,15 +18,16 @@ from pypots.utils.metrics import calc_mae from tests.global_test_config import ( DATA, + EPOCHS, DEVICE, - check_tb_and_model_checkpoints_existence, -) -from tests.imputation.config import ( TRAIN_SET, VAL_SET, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, RESULT_SAVING_DIR_FOR_IMPUTATION, - EPOCHS, + check_tb_and_model_checkpoints_existence, ) @@ -62,7 +63,7 @@ def test_1_impute(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"MRNN test_MAE: {test_MAE}") @@ -97,6 +98,21 @@ def test_3_saving_path(self): # test loading the saved model, not necessary, but need to test self.mrnn.load(saved_model_path) + @pytest.mark.xdist_group(name="imputation-mrnn") + def test_4_lazy_loading(self): + self.mrnn.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + imputation_results = self.mrnn.predict(H5_TEST_SET_PATH) + assert not np.isnan( + imputation_results["imputation"] + ).any(), "Output still has missing values after running impute()." + + test_MAE = calc_mae( + imputation_results["imputation"], + DATA["test_X_ori"], + DATA["test_X_indicating_mask"], + ) + logger.info(f"Lazy-loading MRNN test_MAE: {test_MAE}") + if __name__ == "__main__": unittest.main() diff --git a/tests/imputation/saits.py b/tests/imputation/saits.py index d25f9361..33c6f6cd 100644 --- a/tests/imputation/saits.py +++ b/tests/imputation/saits.py @@ -18,15 +18,16 @@ from pypots.utils.metrics import calc_mae from tests.global_test_config import ( DATA, + EPOCHS, DEVICE, - check_tb_and_model_checkpoints_existence, -) -from tests.imputation.config import ( TRAIN_SET, VAL_SET, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, RESULT_SAVING_DIR_FOR_IMPUTATION, - EPOCHS, + check_tb_and_model_checkpoints_existence, ) @@ -73,7 +74,7 @@ def test_1_impute(self): test_MAE = calc_mae( imputation_results["imputation"], - DATA["test_X_intact"], + DATA["test_X_ori"], DATA["test_X_indicating_mask"], ) logger.info(f"SAITS test_MAE: {test_MAE}") @@ -109,6 +110,21 @@ def test_3_saving_path(self): # test loading the saved model, not necessary, but need to test self.saits.load(saved_model_path) + @pytest.mark.xdist_group(name="imputation-saits") + def test_4_lazy_loading(self): + self.saits.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + imputation_results = self.saits.predict(H5_TEST_SET_PATH) + assert not np.isnan( + imputation_results["imputation"] + ).any(), "Output still has missing values after running impute()." + + test_MAE = calc_mae( + imputation_results["imputation"], + DATA["test_X_ori"], + DATA["test_X_indicating_mask"], + ) + logger.info(f"Lazy-loading SAITS test_MAE: {test_MAE}") + if __name__ == "__main__": unittest.main() diff --git a/tests/imputation/timesnet.py b/tests/imputation/timesnet.py index 33bfae3e..34f9da03 100644 --- a/tests/imputation/timesnet.py +++ b/tests/imputation/timesnet.py @@ -18,15 +18,16 @@ from pypots.utils.metrics import calc_mae from tests.global_test_config import ( DATA, + EPOCHS, DEVICE, - check_tb_and_model_checkpoints_existence, -) -from tests.imputation.config import ( TRAIN_SET, VAL_SET, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, RESULT_SAVING_DIR_FOR_IMPUTATION, - EPOCHS, + check_tb_and_model_checkpoints_existence, ) @@ -69,7 +70,7 @@ def test_1_impute(self): test_MAE = calc_mae( imputation_results["imputation"], - DATA["test_X_intact"], + DATA["test_X_ori"], DATA["test_X_indicating_mask"], ) logger.info(f"TimesNet test_MAE: {test_MAE}") @@ -107,6 +108,21 @@ def test_3_saving_path(self): # test loading the saved model, not necessary, but need to test self.timesnet.load(saved_model_path) + @pytest.mark.xdist_group(name="imputation-timesnet") + def test_4_lazy_loading(self): + self.timesnet.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + imputation_results = self.timesnet.predict(H5_TEST_SET_PATH) + assert not np.isnan( + imputation_results["imputation"] + ).any(), "Output still has missing values after running impute()." + + test_MAE = calc_mae( + imputation_results["imputation"], + DATA["test_X_ori"], + DATA["test_X_indicating_mask"], + ) + logger.info(f"Lazy-loading TimesNet test_MAE: {test_MAE}") + if __name__ == "__main__": unittest.main() diff --git a/tests/imputation/transformer.py b/tests/imputation/transformer.py index 15624dc4..88e02802 100644 --- a/tests/imputation/transformer.py +++ b/tests/imputation/transformer.py @@ -18,15 +18,16 @@ from pypots.utils.metrics import calc_mae from tests.global_test_config import ( DATA, + EPOCHS, DEVICE, - check_tb_and_model_checkpoints_existence, -) -from tests.imputation.config import ( TRAIN_SET, VAL_SET, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, RESULT_SAVING_DIR_FOR_IMPUTATION, - EPOCHS, + check_tb_and_model_checkpoints_existence, ) @@ -68,7 +69,7 @@ def test_1_impute(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"Transformer test_MAE: {test_MAE}") @@ -106,6 +107,21 @@ def test_3_saving_path(self): # test loading the saved model, not necessary, but need to test self.transformer.load(saved_model_path) + @pytest.mark.xdist_group(name="imputation-transformer") + def test_4_lazy_loading(self): + self.transformer.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + imputation_results = self.transformer.predict(H5_TEST_SET_PATH) + assert not np.isnan( + imputation_results["imputation"] + ).any(), "Output still has missing values after running impute()." + + test_MAE = calc_mae( + imputation_results["imputation"], + DATA["test_X_ori"], + DATA["test_X_indicating_mask"], + ) + logger.info(f"Lazy-loading Transformer test_MAE: {test_MAE}") + if __name__ == "__main__": unittest.main() diff --git a/tests/imputation/usgan.py b/tests/imputation/usgan.py index 0ff25ea3..af99b4e7 100644 --- a/tests/imputation/usgan.py +++ b/tests/imputation/usgan.py @@ -18,15 +18,16 @@ from pypots.utils.metrics import calc_mae from tests.global_test_config import ( DATA, + EPOCHS, DEVICE, - check_tb_and_model_checkpoints_existence, -) -from tests.imputation.config import ( TRAIN_SET, VAL_SET, TEST_SET, + H5_TRAIN_SET_PATH, + H5_VAL_SET_PATH, + H5_TEST_SET_PATH, RESULT_SAVING_DIR_FOR_IMPUTATION, - EPOCHS, + check_tb_and_model_checkpoints_existence, ) @@ -42,7 +43,7 @@ class TestUSGAN(unittest.TestCase): D_optimizer = Adam(lr=0.001, weight_decay=1e-5) # initialize a US-GAN model - us_gan = USGAN( + usgan = USGAN( DATA["n_steps"], DATA["n_features"], 256, @@ -55,36 +56,32 @@ class TestUSGAN(unittest.TestCase): @pytest.mark.xdist_group(name="imputation-usgan") def test_0_fit(self): - self.us_gan.fit(TRAIN_SET, VAL_SET) + self.usgan.fit(TRAIN_SET, VAL_SET) @pytest.mark.xdist_group(name="imputation-usgan") def test_1_impute(self): - imputed_X = self.us_gan.impute(TEST_SET) + imputed_X = self.usgan.impute(TEST_SET) assert not np.isnan( imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"US-GAN test_MAE: {test_MAE}") @pytest.mark.xdist_group(name="imputation-usgan") def test_2_parameters(self): - assert hasattr(self.us_gan, "model") and self.us_gan.model is not None + assert hasattr(self.usgan, "model") and self.usgan.model is not None - assert ( - hasattr(self.us_gan, "G_optimizer") and self.us_gan.G_optimizer is not None - ) - assert ( - hasattr(self.us_gan, "D_optimizer") and self.us_gan.D_optimizer is not None - ) + assert hasattr(self.usgan, "G_optimizer") and self.usgan.G_optimizer is not None + assert hasattr(self.usgan, "D_optimizer") and self.usgan.D_optimizer is not None - assert hasattr(self.us_gan, "best_loss") - self.assertNotEqual(self.us_gan.best_loss, float("inf")) + assert hasattr(self.usgan, "best_loss") + self.assertNotEqual(self.usgan.best_loss, float("inf")) assert ( - hasattr(self.us_gan, "best_model_dict") - and self.us_gan.best_model_dict is not None + hasattr(self.usgan, "best_model_dict") + and self.usgan.best_model_dict is not None ) @pytest.mark.xdist_group(name="imputation-usgan") @@ -95,14 +92,29 @@ def test_3_saving_path(self): ), f"file {self.saving_path} does not exist" # check if the tensorboard file and model checkpoints exist - check_tb_and_model_checkpoints_existence(self.us_gan) + check_tb_and_model_checkpoints_existence(self.usgan) # save the trained model into file, and check if the path exists saved_model_path = os.path.join(self.saving_path, self.model_save_name) - self.us_gan.save(saved_model_path) + self.usgan.save(saved_model_path) # test loading the saved model, not necessary, but need to test - self.us_gan.load(saved_model_path) + self.usgan.load(saved_model_path) + + @pytest.mark.xdist_group(name="imputation-usgan") + def test_4_lazy_loading(self): + self.usgan.fit(H5_TRAIN_SET_PATH, H5_VAL_SET_PATH) + imputation_results = self.usgan.predict(H5_TEST_SET_PATH) + assert not np.isnan( + imputation_results["imputation"] + ).any(), "Output still has missing values after running impute()." + + test_MAE = calc_mae( + imputation_results["imputation"], + DATA["test_X_ori"], + DATA["test_X_indicating_mask"], + ) + logger.info(f"Lazy-loading US-GAN test_MAE: {test_MAE}") if __name__ == "__main__": diff --git a/tests/optim/adadelta.py b/tests/optim/adadelta.py index c7eb6e6d..6d11c10b 100644 --- a/tests/optim/adadelta.py +++ b/tests/optim/adadelta.py @@ -47,7 +47,7 @@ def test_0_fit(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"SAITS test_MAE: {test_MAE}") diff --git a/tests/optim/adagrad.py b/tests/optim/adagrad.py index 7cb2a988..2ab28b4a 100644 --- a/tests/optim/adagrad.py +++ b/tests/optim/adagrad.py @@ -47,7 +47,7 @@ def test_0_fit(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"SAITS test_MAE: {test_MAE}") diff --git a/tests/optim/adam.py b/tests/optim/adam.py index 9f583aee..4aedc2c5 100644 --- a/tests/optim/adam.py +++ b/tests/optim/adam.py @@ -47,7 +47,7 @@ def test_0_fit(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"SAITS test_MAE: {test_MAE}") diff --git a/tests/optim/adamw.py b/tests/optim/adamw.py index e785e9f6..4f2164ba 100644 --- a/tests/optim/adamw.py +++ b/tests/optim/adamw.py @@ -47,7 +47,7 @@ def test_0_fit(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"SAITS test_MAE: {test_MAE}") diff --git a/tests/optim/config.py b/tests/optim/config.py index 6eb7ffc4..bc7dbff5 100644 --- a/tests/optim/config.py +++ b/tests/optim/config.py @@ -10,8 +10,7 @@ TRAIN_SET = {"X": DATA["train_X"]} VAL_SET = { "X": DATA["val_X"], - "X_intact": DATA["val_X_intact"], - "indicating_mask": DATA["val_X_indicating_mask"], + "X_ori": DATA["val_X_ori"], } TEST_SET = {"X": DATA["test_X"]} diff --git a/tests/optim/lr_schedulers.py b/tests/optim/lr_schedulers.py index 2aa1c520..e74e8830 100644 --- a/tests/optim/lr_schedulers.py +++ b/tests/optim/lr_schedulers.py @@ -75,7 +75,7 @@ def test_0_lambda_lrs(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"SAITS test_MAE: {test_MAE}") @@ -103,7 +103,7 @@ def test_1_multiplicative_lrs(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"SAITS test_MAE: {test_MAE}") @@ -131,7 +131,7 @@ def test_2_step_lrs(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"SAITS test_MAE: {test_MAE}") @@ -159,7 +159,7 @@ def test_3_multistep_lrs(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"SAITS test_MAE: {test_MAE}") @@ -188,7 +188,7 @@ def test_4_constant_lrs(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"SAITS test_MAE: {test_MAE}") @@ -216,7 +216,7 @@ def test_5_linear_lrs(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"SAITS test_MAE: {test_MAE}") @@ -244,6 +244,6 @@ def test_6_exponential_lrs(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"SAITS test_MAE: {test_MAE}") diff --git a/tests/optim/rmsprop.py b/tests/optim/rmsprop.py index f4a3f53c..e22633c9 100644 --- a/tests/optim/rmsprop.py +++ b/tests/optim/rmsprop.py @@ -47,7 +47,7 @@ def test_0_fit(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"SAITS test_MAE: {test_MAE}") diff --git a/tests/optim/sgd.py b/tests/optim/sgd.py index 7dec3bf3..74fa5c1f 100644 --- a/tests/optim/sgd.py +++ b/tests/optim/sgd.py @@ -47,7 +47,7 @@ def test_0_fit(self): imputed_X ).any(), "Output still has missing values after running impute()." test_MAE = calc_mae( - imputed_X, DATA["test_X_intact"], DATA["test_X_indicating_mask"] + imputed_X, DATA["test_X_ori"], DATA["test_X_indicating_mask"] ) logger.info(f"SAITS test_MAE: {test_MAE}") diff --git a/tests/utils/visual.py b/tests/utils/visual.py new file mode 100644 index 00000000..5ba8cad1 --- /dev/null +++ b/tests/utils/visual.py @@ -0,0 +1,35 @@ +""" +Test cases for the functions and classes in package `pypots.utils.visual`. +""" + +# Created by Wenjie Du +# License: BSD-3-Clause + +import unittest + +import numpy as np + +from pypots.imputation import LOCF +from pypots.utils.visual.data import plot_data, plot_missingness +from tests.global_test_config import TEST_SET + + +class TestVisual(unittest.TestCase): + locf = LOCF() + imputed_test_set = locf.predict(TEST_SET) + imputed_X = imputed_test_set["imputation"] + X_with_missingness = TEST_SET["X"] + X_ori = TEST_SET["X_ori"] + + def test_plot_data(self): + plot_data(self.X_with_missingness, self.X_ori, self.imputed_X, sample_idx=10) + + def test_plot_missingness(self): + plot_missingness(self.X_with_missingness, max_step=24, sample_idx=10) + plot_missingness( + ~np.isnan(self.X_with_missingness[10]), max_step=24, sample_idx=10 + ) + + +if __name__ == "__main__": + unittest.main()