Skip to content

Commit

Permalink
feat: add latent align dataset (#25)
Browse files Browse the repository at this point in the history
* feat: generalize DomainAdaptionDataset

* feat: generalize to_tensor

* feat: add LatentAlignDataModule

* docs: add docstrings
  • Loading branch information
tilman151 authored Mar 2, 2023
1 parent 0341b73 commit 5ed630b
Show file tree
Hide file tree
Showing 6 changed files with 330 additions and 82 deletions.
6 changes: 5 additions & 1 deletion rul_datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

import warnings

from .adaption import DomainAdaptionDataModule, PretrainingAdaptionDataModule
from .adaption import (
DomainAdaptionDataModule,
PretrainingAdaptionDataModule,
LatentAlignDataModule,
)
from .baseline import BaselineDataModule, PretrainingBaselineDataModule
from .core import RulDataModule
from .reader import CmapssReader, FemtoReader, XjtuSyReader
Expand Down
214 changes: 178 additions & 36 deletions rul_datasets/adaption.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

import warnings
from copy import deepcopy
from typing import List, Optional, Any, Tuple, Callable
from typing import List, Optional, Any, Tuple, Callable, Sequence

import numpy as np
import pytorch_lightning as pl
import torch
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.dataset import ConcatDataset, TensorDataset

from rul_datasets import utils
from rul_datasets.core import PairedRulDataset, RulDataModule


Expand All @@ -19,7 +21,7 @@ class DomainAdaptionDataModule(pl.LightningDataModule):
The training data of both domains is wrapped in a [AdaptionDataset]
[rul_datasets.adaption.AdaptionDataset] which provides a random sample of the
target domain with each sample of the source domain. It provides the validation and
test splits of both domains, and a [paired dataset]
test splits of both domains, and optionally a [paired dataset]
[rul_datasets.core.PairedRulDataset] for both.
Examples:
Expand All @@ -30,8 +32,8 @@ class DomainAdaptionDataModule(pl.LightningDataModule):
>>> target = rul_datasets.RulDataModule(fd2, 32)
>>> dm = rul_datasets.DomainAdaptionDataModule(source, target)
>>> train_1_2 = dm.train_dataloader()
>>> val_1, val_2, paired_val_1_2 = dm.val_dataloader()
>>> test_1, test_2, paired_test_1_2 = dm.test_dataloader()
>>> val_1, val_2 = dm.val_dataloader()
>>> test_1, test_2 = dm.test_dataloader()
"""

def __init__(
Expand Down Expand Up @@ -206,15 +208,153 @@ def _get_paired_dataset(self) -> PairedRulDataset:
return paired


class LatentAlignDataModule(DomainAdaptionDataModule):
"""
A higher-order [data module][pytorch_lightning.core.LightningDataModule] based on
[DomainAdaptionDataModule][rul_datasets.adaption.DomainAdaptionDataModule].
It is specifically made to work with the latent space alignment approach by Zhang
et al. The training data of both domains is wrapped in a [AdaptionDataset]
[rul_datasets.adaption.AdaptionDataset] which splits the data into healthy and
degrading. For each sample of degrading source data, a random sample of degrading
target data and healthy sample of either source or target data is drawn. The
number of steps in degradation are supplied for each degrading sample, as well.
The data module also provides the validation and test splits of both domains, and
optionally a [paired dataset][rul_datasets.core.PairedRulDataset] for both.
Examples:
>>> import rul_datasets
>>> fd1 = rul_datasets.CmapssReader(fd=1, window_size=20)
>>> fd2 = rul_datasets.CmapssReader(fd=2, percent_broken=0.8)
>>> source = rul_datasets.RulDataModule(fd1, 32)
>>> target = rul_datasets.RulDataModule(fd2, 32)
>>> dm = rul_datasets.LatentAlignDataModule(source, target)
>>> train_1_2 = dm.train_dataloader()
>>> val_1, val_2 = dm.val_dataloader()
>>> test_1, test_2 = dm.test_dataloader()
"""

def __init__(
self,
source: RulDataModule,
target: RulDataModule,
paired_val: bool = False,
split_by_max_rul: bool = False,
split_by_steps: Optional[int] = None,
) -> None:
"""
Create a new latent align data module from a source and target
[RulDataModule][rul_datasets.RulDataModule]. The source domain is considered
labeled and the target domain unlabeled.
The source and target data modules are checked for compatability (see
[RulDataModule][rul_datasets.core.RulDataModule.check_compatibility]). These
checks include that the `fd` differs between them, as they come from the same
domain otherwise.
The healthy and degrading data can be split by either maximum RUL value or
the number of time steps. See [split_healthy]
[rul_datasets.adaption.split_healthy] for more information.
Args:
source: The data module of the labeled source domain.
target: The data module of the unlabeled target domain.
paired_val: Whether to include paired data in validation.
split_by_max_rul: Whether to split healthy and degrading by max RUL value.
split_by_steps: Split the healthy and degrading data after this number of
time steps.
"""
super().__init__(source, target, paired_val)

if not split_by_max_rul and (split_by_steps is None):
raise ValueError(
"Either 'split_by_max_rul' or 'split_by_steps' need to be set."
)

self.split_by_max_rul = split_by_max_rul
self.split_by_steps = split_by_steps

def _to_dataset(self, split: str) -> "AdaptionDataset":
source_healthy, source_degraded = split_healthy(
*self.source.reader.load_split(split), by_max_rul=True
)
target_healthy, target_degraded = split_healthy(
*self.target.reader.load_split(split),
self.split_by_max_rul,
self.split_by_steps,
)
healthy: Dataset = ConcatDataset([source_healthy, target_healthy])
dataset = AdaptionDataset(source_degraded, target_degraded, healthy)

return dataset


def split_healthy(
features: List[np.ndarray],
targets: List[np.ndarray],
by_max_rul: bool = False,
by_steps: Optional[int] = None,
) -> Tuple[TensorDataset, TensorDataset]:
"""
Split the feature and target time series into healthy and degrading parts and
return a dataset of each.
If `by_max_rul` is set to `True` the time steps with the maximum RUL value in
each time series is considered healthy. This option is intended for labeled data
with piece-wise linear RUL functions. If `by_steps` is set to an integer,
the first `by_steps` time steps of each series are considered healthy. This
option is intended for unlabeled data or data with a linear RUL function.
One option has to be set and both are mutually exclusive.
Args:
features: List of feature time series.
targets: List of target time series.
by_max_rul: Whether to split healthy and degrading data by max RUL value.
by_steps: Split healthy and degrading data after this number of time steps.
Returns:
healthy: Dataset of healthy data.
degrading: Dataset of degrading data.
"""
if not by_max_rul and (by_steps is None):
raise ValueError("Either 'by_max_rul' or 'by_steps' need to be set.")

healthy = []
degraded = []
for feature, target in zip(features, targets):
# get index of last max RUL or use step
split_idx = [np.argmax(target[::-1]) if by_max_rul else by_steps]
healthy_feature, degraded_feature = np.split(feature, split_idx) # type: ignore
healthy_target, degraded_target = np.split(target, split_idx) # type: ignore
degradation_steps = np.arange(len(degraded_target))
healthy.append((healthy_feature, healthy_target))
degraded.append((degraded_feature, degradation_steps, degraded_target))

healthy_dataset = _to_dataset(healthy)
degraded_dataset = _to_dataset(degraded)

return healthy_dataset, degraded_dataset


def _to_dataset(data: Sequence[Tuple[np.ndarray, ...]]) -> TensorDataset:
tensor_data = [torch.cat(h) for h in utils.to_tensor(*zip(*data))]
dataset = TensorDataset(*tensor_data)

return dataset


class AdaptionDataset(Dataset):
"""
A torch [dataset][torch.utils.data.Dataset] for unsupervised domain adaption. The
dataset takes a source and a target [dataset][torch.utils.data.Dataset] and
combines them. For each label/features pair from the source dataset, a random
sample of features is drawn from the target data. The target datasets labels are
omitted. The datasets length is determined by the source dataset. This setup can
be used to train with common unsupervised domain adaption methods like DAN,
DANN or JAN.
dataset takes a labeled source and one or multiple unlabeled target [dataset]
[torch.utils.data.Dataset] and combines them.
For each label/features pair from the source dataset, a random sample of features
is drawn from each target dataset. The datasets are supposed to provide a sample
as a tuple of tensors. The target datasets' labels are assumed to be the last
element of the tuple and are omitted. The datasets length is determined by the
source dataset. This setup can be used to train with common unsupervised domain
adaption methods like DAN, DANN or JAN.
Examples:
>>> import torch
Expand All @@ -225,53 +365,55 @@ class AdaptionDataset(Dataset):
>>> source_features, source_label, target_features = dataset[0]
"""

_target_idx: np.ndarray
_get_target_idx: Callable
_unlabeled_idx: np.ndarray
_get_unlabeled_idx: Callable

def __init__(
self, source: Dataset, target: Dataset, deterministic: bool = False
self, labeled: Dataset, *unlabeled: Dataset, deterministic: bool = False
) -> None:
"""
Create a new adaption data set from a source and a target domain dataset.
Create a new adaption data set from a labeled source and one or multiple
unlabeled target dataset.
By default, a new target sample is drawn when a source sample is accessed.
This is the recommended setting for training. To deactivate this behavior and
fix the pairing of source and target samples, set `deterministic` to `True`.
This is the recommended setting for evaluation.
By default, a random sample is drawn from each target dataset when a source
sample is accessed. This is the recommended setting for training. To
deactivate this behavior and fix the pairing of source and target samples,
set `deterministic` to `True`. This is the recommended setting for evaluation.
Args:
source: The dataset from the source domain.
target: The dataset from the target domain.
labeled: The dataset from the labeled domain.
*unlabeled: The dataset(s) from the unlabeled domain(s).
deterministic: Return the same target sample for each source sample.
"""
self.source = source
self.target = target
self.labeled = labeled
self.unlabeled = unlabeled
self.deterministic = deterministic
self._target_len = len(target) # type: ignore
self._unlabeled_len = [len(ul) for ul in self.unlabeled] # type: ignore

if self.deterministic:
self._rng = np.random.default_rng(seed=42)
self._get_target_idx = self._get_deterministic_target_idx
self._target_idx = self._rng.integers(0, self._target_len, len(self))
size = (len(self), len(self._unlabeled_len))
self._unlabeled_idx = self._rng.integers(0, self._unlabeled_len, size)
self._get_unlabeled_idx = self._get_deterministic_unlabeled_idx
else:
self._rng = np.random.default_rng()
self._get_target_idx = self._get_random_target_idx
self._get_unlabeled_idx = self._get_random_unlabeled_idx

def _get_random_target_idx(self, idx: int) -> int:
return self._rng.integers(0, self._target_len)
def _get_random_unlabeled_idx(self, _: int) -> np.ndarray:
return self._rng.integers(0, self._unlabeled_len)

def _get_deterministic_target_idx(self, idx: int) -> int:
return self._target_idx[idx]
def _get_deterministic_unlabeled_idx(self, idx: int) -> np.ndarray:
return self._unlabeled_idx[idx]

def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor]:
target_idx = self._get_target_idx(idx)
source, source_label = self.source[idx]
target, _ = self.target[target_idx]
def __getitem__(self, idx: int) -> Tuple[torch.Tensor, ...]:
item = self.labeled[idx]
for unlabeled, ul_idx in zip(self.unlabeled, self._get_unlabeled_idx(idx)):
item += unlabeled[ul_idx][:-1] # drop label tensor in last position

return source, source_label, target
return item

def __len__(self) -> int:
return len(self.source) # type: ignore
return len(self.labeled) # type: ignore


class PretrainingAdaptionDataModule(pl.LightningDataModule):
Expand Down
2 changes: 1 addition & 1 deletion rul_datasets/reader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
(163, 30, 14)
```
The targets are a list of [numpy arrays][numpy.ndarrays], too, where each array has a
The targets are a list of [numpy arrays][numpy.ndarray], too, where each array has a
shape of `[num_windows]`:
```pycon
Expand Down
10 changes: 6 additions & 4 deletions rul_datasets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,15 @@ def download_file(url: str, save_path: str) -> None:


def to_tensor(
features: List[np.ndarray], targets: List[np.ndarray]
) -> Tuple[List[torch.Tensor], List[torch.Tensor]]:
features: List[np.ndarray], *targets: List[np.ndarray]
) -> Tuple[List[torch.Tensor], ...]:
dtype = torch.float32
tensor_feats = [feature_to_tensor(f, dtype) for f in features]
tensor_targets = [torch.tensor(t, dtype=dtype) for t in targets]
tensor_targets = [
[torch.tensor(t, dtype=dtype) for t in target] for target in targets
]

return tensor_feats, tensor_targets
return tensor_feats, *tensor_targets


def feature_to_tensor(features: np.ndarray, dtype: torch.dtype) -> torch.Tensor:
Expand Down
Loading

0 comments on commit 5ed630b

Please sign in to comment.