diff --git a/python/tvm/meta_schedule/cost_model/mlp_model.py b/python/tvm/meta_schedule/cost_model/mlp_model.py index 1f96a9ce8a688..5024e545b6588 100644 --- a/python/tvm/meta_schedule/cost_model/mlp_model.py +++ b/python/tvm/meta_schedule/cost_model/mlp_model.py @@ -18,6 +18,7 @@ MLP-based cost model """ +import glob import logging import math import os @@ -35,7 +36,9 @@ # pylint: disable=relative-beyond-top-level from ...contrib.tar import tar, untar from ...runtime import NDArray +from ...target import Target from ..cost_model import PyCostModel +from ..database import JSONDatabase from ..feature_extractor import FeatureExtractor, PerStoreFeature from ..runner import RunnerResult from ..search_strategy import MeasureCandidate @@ -142,9 +145,9 @@ class SegmentDataLoader: def __init__( self, features, - results, + results=None, batch_size=128, - shuffle=False, + shuffle=True, ): self.batch_size = batch_size self.shuffle = shuffle @@ -156,10 +159,10 @@ def __init__( torch.cumsum(self.segment_sizes, 0, dtype=torch.int32) - self.segment_sizes ) features = torch.cat([torch.tensor(feature) for feature in features]) - norm = features.max(dim=0)[0] + norm, _ = features.max(dim=0) norm[norm == 0] = 1 self.features = features / norm - self.results = torch.tensor(results) + self.results = torch.tensor(results) if results is not None else None self.iter_order = self.pointer = None def __len__(self): @@ -188,11 +191,14 @@ def _fetch_indices(self, indices): feature_indices[idx : idx + seg_size] = torch.arange(offset, offset + seg_size) idx += seg_size features = self.features[feature_indices.long()] - results = self.results[indices.long()] - return segment_sizes, features, results + if self.results is None: + return segment_sizes, features + else: + results = self.results[indices.long()] + return segment_sizes, features, results -class SegmentSumMLPModule(nn.Module): +class SegmentSumMLP(nn.Module): """SegmentSum MLP model. Parameters @@ -354,76 +360,416 @@ def topk_score( return score.item() +class SegmentSumMLPTrainer: + """Handles either full training or incremental training. + Store self.data as attr, because after update adds new data into self.data + the user can decide whether to train on the whole data or just incremental training. + Note: Preprocessor is used for pre-processing data, but all data should be stored + in the trainer.""" + + model: SegmentSumMLP + data: Dict[str, FeatureGroup] + data_size: int + untrained_size: int + + def __init__( + self, + model, + data, + data_size, + batch_size=128, + learning_rate=7e-4, + weight_decay=1e-6, + num_epoch=50, + num_epoch_incremental=5, + grad_clip_norm=0.5, + train_verbose=25, + test_split=0.2, + test_interval=5, + ): + self.model = model + self.data = data + self.data_size = data_size + # use for the heuristic + self.untrained_size = data_size + self.batch_size = batch_size + self.learning_rate = learning_rate + self.weight_decay = weight_decay + self.num_epoch = num_epoch + # number of epoches used for incremental training + self.num_epoch_incremental = num_epoch_incremental + self.grad_clip_norm = grad_clip_norm + self.train_verbose = train_verbose + self.test_split = test_split + self.test_interval = test_interval + + self.device = "cuda" if torch.cuda.device_count() else "cpu" + self.optimizer = torch.optim.Adam( + self.model.parameters(), lr=learning_rate, weight_decay=weight_decay + ) + self.scheduler = torch.optim.lr_scheduler.StepLR( + self.optimizer, step_size=num_epoch // 3, gamma=0.7 + ) + + def train_step(self, data, batch=0, train_loss=None): + """Train on a mini-batch. + data: the current batch data + train_loss: the current averaged train loss""" + segment_sizes, features, gt_results = ( + data[0].to(self.device), + data[1].to(self.device), + data[2].to(self.device), + ) + self.optimizer.zero_grad() + pred_results = self.model(segment_sizes, features) + loss = self.model.lambda_rank_loss(pred_results, gt_results) + loss.backward() + torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.grad_clip) + self.optimizer.step() + loss = loss.detach().cpu() + train_loss = ( + train_loss * 0.95 + loss.item() * 0.05 if train_loss is not None else loss.item() + ) + segment_sizes, features, gt_results, pred_results = ( + segment_sizes.detach().cpu(), + features.detach().cpu(), + gt_results.detach().cpu(), + pred_results.detach().cpu(), + ) + if batch % self.train_verbose == 0: + logger.info("Batch: %d, train loss: %6f", batch, train_loss) + del pred_results + del loss + return train_loss + + def validate_step(self, data): + """Validate on a mini-batch; or predict it. + data: the current batch data""" + segment_sizes, features = ( + data[0].to(self.device), + data[1].to(self.device), + ) + gt_results = data[2] + pred_results = self.model(segment_sizes, features) + segment_sizes, features, pred_results = ( + segment_sizes.detach().cpu(), + features.detach().cpu(), + pred_results.detach().cpu(), + ) + if gt_results is None: + return pred_results.numpy() + test_loss_batch = self.model.lambda_rank_loss(pred_results, gt_results).item() + test_scores_batch = [] + for k in [1, 5, 10]: + test_scores_batch.append(self.model.topk_score(pred_results, gt_results, k)) + del pred_results + return test_loss_batch, test_scores_batch + + def train_full(self): + """Train on all data stored now.""" + # if train on full, have to split into train and test for MLP models + self.untrained_size = 0 + keys = list(self.data.keys()) + test_keys = random.sample(keys, k=math.floor(len(keys) * self.test_split)) + train_data = OrderedDict() + test_data = OrderedDict() + for key in keys: + if key in test_keys: + test_data[key] = self.data[key] + else: + train_data[key] = self.data[key] + train_features = list( + itertools_chain.from_iterable([g.features for g in train_data.values()]) + ) + test_features = list( + itertools_chain.from_iterable([g.features for g in test_data.values()]) + ) + train_results = np.concatenate([g.min_cost / g.costs for g in train_data.values()]) + test_results = np.concatenate([g.min_cost / g.costs for g in test_data.values()]) + train_loader = SegmentDataLoader( + train_features, train_results, batch_size=self.batch_size, shuffle=True + ) + test_loader = SegmentDataLoader( + test_features, test_results, batch_size=self.batch_size, shuffle=False + ) + logger.info("Training size: %d, testing size: %d", len(train_loader), len(test_loader)) + + self.model = self.model.to(self.device) + # use to save best model + min_test_loss = 1e10 + with tempfile.TemporaryDirectory() as tmp_dir: + model_cache_path = os.path.join(tmp_dir, "best_model.pth") + for epoch in range(self.num_epoch): + logger.info("Epoch: %d", epoch) + # training + self.model.train() + train_loss = None + for batch, data in tqdm(enumerate(train_loader)): + train_loss = self.train_step(data, batch, train_loss) + self.scheduler.step() + # testing + if epoch % self.test_interval == 0: + self.model.eval() + test_losses, test_scores = [], [] + for batch, data in tqdm(enumerate(test_loader)): + test_loss_batch, test_scores_batch = self.validate_step(data) + test_losses.append(test_loss_batch) + test_scores.append(test_scores_batch) + test_loss = ( + np.array(test_losses[:-1]).mean() if len(test_losses) > 1 else test_losses[0] + ) + logger.info( + "Average test loss: %6f, top1 score: %5f, top5 score: %5f, top10 score: %5f", + test_loss, + np.array(test_scores)[:, 0].mean(), + np.array(test_scores)[:, 1].mean(), + np.array(test_scores)[:, 2].mean(), + ) + if test_loss < min_test_loss: + min_test_loss = test_loss + torch.save(self.model.state_dict(), model_cache_path) + self.model.to("cpu").load_state_dict(torch.load(model_cache_path)) + + def train_incremental(self, features, results): + """Train only on the newly added data. + Since we'll train all newly added data, we don't split into training and testing.""" + self.untrained_size = max(0, self.untrained_size - len(features)) + results = np.min(results) / results + loader = SegmentDataLoader(features, results, batch_size=self.batch_size, shuffle=True) + logger.info("Incremental training size: %d", len(loader)) + + self.model = self.model.to(self.device) + # use to save best model + min_loss = 1e10 + with tempfile.TemporaryDirectory() as tmp_dir: + model_cache_path = os.path.join(tmp_dir, "best_model.pth") + for _ in range(self.num_epoch_incremental): + # training + self.model.train() + loss = None + for batch, data in tqdm(enumerate(loader)): + loss = self.train_step(data, batch, loss) + self.scheduler.step() + if loss < min_loss: + min_loss = loss + torch.save(self.model.state_dict(), model_cache_path) + self.model.to("cpu").load_state_dict(torch.load(model_cache_path)) + + def validate_incremental(self, features, results=None): + """Validate incremental data before training them; or used for predicting + input features if results are None.""" + if results is not None: + results = np.min(results) / results + loader = SegmentDataLoader( + features, results, batch_size=self.batch_size, shuffle=False + ) + + self.model = self.model.to(self.device).eval() + if results is not None: + # validation + test_losses, test_scores = [], [] + for _, data in tqdm(enumerate(loader)): + test_loss_batch, test_scores_batch = self.validate_step(data) + test_losses.append(test_loss_batch) + test_scores.append(test_scores_batch) + test_loss = ( + np.array(test_losses[:-1]).mean() if len(test_losses) > 1 else test_losses[0] + ) + logger.info( + "Average test loss: %6f, top1 score: %5f, top5 score: %5f, top10 score: %5f", + test_loss, + np.array(test_scores)[:, 0].mean(), + np.array(test_scores)[:, 1].mean(), + np.array(test_scores)[:, 2].mean(), + ) + else: + # prediction + pred_results = [] + for _, data in tqdm(enumerate(loader)): + pred_results = self.validate_step(data) + pred_results.append(pred_results) + return np.concatenate(pred_results) + + def update(self, features, results, group_hash, frozen=False): + """Update with new extracted features and results, and store them into the data groups. + Can decide whether use train_full or train_incremental by a simple heuristics.""" + group = self.data.get(group_hash, None) + if group is None: + group = FeatureGroup( + group_hash=group_hash, + features=features, + costs=results, + ) + else: + group.append(features, results) + self.data[group_hash] = group + self.data_size += len(features) + self.untrained_size += len(features) + # use a heuristic to determine train full or train incremental. + if not frozen: + # validate first, cuz it's the only place reasonable for validation + # in incremental training + self.validate_incremental(features, results) + if self.untrained_size / self.data_size > 0.2: + self.train_full() + else: + self.train_incremental(features, results) + + def save(self, path): + """Call save here because all data and the updated model are here.""" + with tempfile.TemporaryDirectory() as tmp_dir: + model_path = os.path.join(tmp_dir, "model.pth") + data_path = os.path.join(tmp_dir, "data.npy") + # Step 1. Save the model + torch.save(self.model.state_dict(), model_path) + # Step 2. Save data + data = [ + ( + g.group_hash, + g.features, + g.costs, + ) + for g in self.data.values() + ] + np.save( + file=data_path, + arr=np.array(data, dtype=object), + ) + # Step 3. Tar it + tar(path, [x for x in [model_path, data_path] if x is not None]) + logger.info("Saved MLPModel to %s", path) + + +class Preprocessing: + """Handles processing raw data into feature vectors. + Unzip three things: + 1) model: if exists, start training from the saved model. + 2) cached data: extracted features and results; if exists, + skip raw data. + 3) raw data: extract features and results from raw data. + """ + + model: SegmentSumMLP + data: Dict[str, FeatureGroup] + data_size: int + + def __init__( + self, model_config=SegmentSumMLPConfig(), extractor=PerStoreFeature(extract_workload=True) + ): + # model is initialized here + self.model = SegmentSumMLP(**model_config.to_dict()) + self.data = OrderedDict() + self.data_size = 0 + self.extractor = extractor + + def extract_features(self, candidates, context, results=None): + """Extract features and results into the form that can be + added into a feature group.""" + + def _feature(feature: NDArray) -> np.ndarray: + return feature.numpy().astype("float32") + + def _mean_cost(res: RunnerResult) -> float: + if not res.run_secs: + return 1e10 + return float(np.median([float(s) for s in res.run_secs])) + + new_features = [_feature(x) for x in self.extractor.extract_from(context, candidates)] + if results is None: + return new_features + else: + new_mean_costs = np.array([_mean_cost(x) for x in results]).astype("float32") + return new_features, new_mean_costs + + def add_to_group(self, features, results, group_hash): + """Add features and results to the data group.""" + group = self.data.get(group_hash, None) + if group is None: + group = FeatureGroup( + group_hash=group_hash, + features=features, + costs=results, + ) + else: + group.append(features, results) + self.data[group_hash] = group + self.data_size += len(features) + + def load_model_and_data(self, path): # pylint: disable=too-many-locals + """Main function for loading and processing data.""" + with tempfile.TemporaryDirectory() as tmp_dir: + model_path = os.path.join(tmp_dir, "model.pth") + cache_path = os.path.join(tmp_dir, "cached_data.npy") + raw_path = os.path.join(tmp_dir, "raw_data") + untar(path, tmp_dir) + if os.path.exists(model_path): + self.model.load_state_dict(torch.load(model_path)) + if os.path.exists(cache_path): + for group_hash, features, costs in np.load(cache_path, allow_pickle=True): + self.data[group_hash] = FeatureGroup( + group_hash=group_hash, + features=list(features), + costs=costs, + ) + self.data_size += len(costs) + elif os.path.exists(raw_path): + model_dirs = glob.glob(os.path.join(raw_path, "*")) + workload_paths = [] + for model_dir in model_dirs: + json_files = glob.glob(os.path.join(model_dir, "*.json")) + for json_file in json_files: + if json_file.endswith("_workload.json"): + workload_paths.append(json_file) + for workload_path in tqdm(workload_paths): + database = JSONDatabase( + path_workload=workload_path, + path_tuning_record=workload_path.replace( + "_workload.json", "_candidates.json" + ), + ) + candidates, results = [], [] + tuning_records = database.get_all_tuning_records() + for record in tuning_records: + candidates.append(record.as_measure_candidate()) + results.append(RunnerResult(run_secs=record.run_secs, error_msg=None)) + assert len(candidates) == len(results) + context = TuneContext( + mod=tuning_records[0].workload.mod, target=Target("nvidia/nvidia-v100") + ) + features, results = self.extract_features(candidates, context, results) + self.add_to_group(features, results, shash2hex(context.mod)) + else: + raise ValueError("Need to provide at least one of cached data or raw data.") + + @derived_object class MLPModel(PyCostModel): - """MLP model + """MLP model; integration with TVM Parameters ---------- - extractor : FeatureExtractor - The feature extractor for the model. - config : SegmentSumMLPConfig - The SegmentSum MLP model config. - num_epoch : int - Number of epoches. - learning_rate : float - Learning rate. - grad_clip : float - Gradient clipping max norm. - weight_decay : float - Adam weight decay. - batch_size : int - The batch size for dataloader. - test_split : float - The portion of the testing set. - test_interval : int - The testing interval (in number of epoches). - train_verbose : int - The verbose frequency for training (in number of batches). + Hmm.. None for now? + Can pass in training hyperparams to Trainer here. Basically, it's a wrapper. """ extractor: FeatureExtractor config: SegmentSumMLPConfig - num_epoch: int - learning_rate: float - grad_clip: float - weight_decay: float - batch_size: int - test_split: float - test_interval: int - train_verbose: int - data: Dict[str, FeatureGroup] - data_size: int + preprocessor: Preprocessing + trainer: SegmentSumMLPTrainer def __init__( self, *, extractor: FeatureExtractor = PerStoreFeature(extract_workload=True), config: SegmentSumMLPConfig = SegmentSumMLPConfig(), - num_epoch: int = 50, - learning_rate: float = 7e-4, - grad_clip: float = 0.5, - weight_decay: float = 1e-6, - batch_size: int = 128, - test_split: float = 0.2, - test_interval: int = 5, - train_verbose: int = 25, + preprocessor: Preprocessing = Preprocessing(), + trainer: SegmentSumMLPTrainer = None, ): super().__init__() self.extractor = extractor self.config = config - self.num_epoch = num_epoch - self.learning_rate = learning_rate - self.grad_clip = grad_clip - self.weight_decay = weight_decay - self.batch_size = batch_size - self.test_split = test_split - self.test_interval = test_interval - self.train_verbose = train_verbose - self.model = SegmentSumMLPModule(**self.config.to_dict()) - self.device = "cuda" if torch.cuda.device_count() else "cpu" - self.data = OrderedDict() - self.data_size = 0 + self.preprocessor = preprocessor + self.trainer = trainer def load(self, path: str) -> None: """Load the cost model from given file location. @@ -438,27 +784,12 @@ def load(self, path: str) -> None: To expedite data loading and processing, each time this method loads the model together with previously cached feature vectors if exist. """ - with tempfile.TemporaryDirectory() as tmp_dir: - model_path = os.path.join(tmp_dir, "model.pth") - data_path = os.path.join(tmp_dir, "data.npy") - # Step 1. Untar - untar(path, tmp_dir) - # Step 2. Load data - if os.path.exists(data_path): - data = OrderedDict() - data_size = 0 - for group_hash, features, costs in np.load(data_path, allow_pickle=True): - data[group_hash] = FeatureGroup( - group_hash=group_hash, - features=list(features), - costs=costs, - ) - data_size += len(costs) - self.data = data - self.data_size = data_size - # Step 3. Load the model - if os.path.exists(model_path): - self.model.load_state_dict(torch.load(model_path)) + self.preprocessor.load_model_and_data(path) + self.trainer = SegmentSumMLPTrainer( + self.preprocessor.model, + self.preprocessor.data, + self.preprocessor.data_size, + ) def save(self, path: str) -> None: """Save the cost model to given file location. @@ -473,34 +804,14 @@ def save(self, path: str) -> None: To expedite data loading and processing, each time this method saves the model together with previously cached feature vectors. """ - with tempfile.TemporaryDirectory() as tmp_dir: - model_path = os.path.join(tmp_dir, "model.pth") - data_path = os.path.join(tmp_dir, "data.npy") - # Step 1. Save the model - torch.save(self.model.state_dict(), model_path) - # Step 2. Save data - data = [ - ( - g.group_hash, - g.features, - g.costs, - ) - for g in self.data.values() - ] - np.save( - file=data_path, - arr=np.array(data, dtype=object), - ) - # Step 3. Tar it - tar(path, [x for x in [model_path, data_path] if x is not None]) - logger.info("Saved MLPModel to %s", path) + self.trainer.save(path) def update( self, context: TuneContext, candidates: List[MeasureCandidate], results: List[RunnerResult], - skip_model: bool = False, + frozen: bool = False, ) -> None: """Update the cost model given running results. @@ -512,57 +823,17 @@ def update( The measure candidates. results : List[RunnerResult] The running results of the measure candidates. - skip_model : bool + frozen : bool Skip updating the cost model, only load the data into runtime. """ - assert len(candidates) == len(results) - if len(candidates) == 0: - return - - # Step 1. Get the feature group - new_group_hash = shash2hex(context.mod) - group = self.data.get(new_group_hash, None) - - # Step 2. Extract features - def _feature(feature: NDArray) -> np.ndarray: - return feature.numpy().astype("float32") - - def _mean_cost(res: RunnerResult) -> float: - if not res.run_secs: - return 1e10 - return float(np.median([float(s) for s in res.run_secs])) - - new_features = [_feature(x) for x in self.extractor.extract_from(context, candidates)] - new_mean_costs = np.array([_mean_cost(x) for x in results]).astype("float32") - - # Steps 3. Run validation - if not skip_model and group is not None: - logger.debug( - "MLP validation: %s", - "\t".join( - f"{key}: {score:.6f}" - for key, score in self._validate( - features=new_features, - gt_results=group.min_cost / new_mean_costs, - ) - ), + if self.trainer is None: + self.trainer = SegmentSumMLPTrainer( + self.preprocessor.model, + self.preprocessor.data, + self.preprocessor.data_size, ) - - # Step 4. Add the features into the data points - if group is None: - group = FeatureGroup( - group_hash=new_group_hash, - features=new_features, - costs=new_mean_costs, - ) - else: - group.append(new_features, new_mean_costs) - self.data[new_group_hash] = group - self.data_size += len(new_features) - - # Step 5. Re-train the model - if not skip_model: - self._train() + features, results = self.preprocessor.extract_features(candidates, context, results) + self.trainer.update(features, results, shash2hex(context.mod), frozen) def predict(self, context: TuneContext, candidates: List[MeasureCandidate]) -> np.ndarray: """Predict the normalized score using the cost model. @@ -579,165 +850,7 @@ def predict(self, context: TuneContext, candidates: List[MeasureCandidate]) -> n result : np.ndarray The predicted normalized score. """ - features = [ - torch.tensor(x.numpy().astype("float32")) - for x in self.extractor.extract_from(context, candidates) - ] - segment_sizes = torch.tensor([len(feature) for feature in features]).to(self.device) - features = torch.cat(features).to(self.device) - norm = features.max(dim=0)[0] - norm[norm == 0] = 1 - features /= norm - # begin predicting - self.model = self.model.to(self.device) - self.model.eval() - result = self.model(segment_sizes, features).detach().cpu().numpy() - return result - - def _train(self) -> None: # pylint: disable=too-many-locals,too-many-statements - """Train the MLP model using all the data in the runtime.""" - # split into training and testing set - keys = list(self.data.keys()) - test_keys = random.sample(keys, k=math.floor(len(keys) * self.test_split)) - test_data = OrderedDict() - for key in test_keys: - test_data[key] = self.data[key] - del self.data[key] - train_features = list( - itertools_chain.from_iterable([g.features for g in self.data.values()]) - ) - test_features = list( - itertools_chain.from_iterable([g.features for g in test_data.values()]) - ) - train_results = np.concatenate([g.min_cost / g.costs for g in self.data.values()]) - test_results = np.concatenate([g.min_cost / g.costs for g in test_data.values()]) - train_loader = SegmentDataLoader( - train_features, train_results, batch_size=self.batch_size, shuffle=True - ) - test_loader = SegmentDataLoader( - test_features, test_results, batch_size=self.batch_size, shuffle=False - ) - self.data, test_data = None, None # save memory - logger.info("Training size: %d, testing size: %d", len(train_loader), len(test_loader)) - - # begin training - self.model = self.model.to(self.device) - optimizer = torch.optim.Adam( - self.model.parameters(), lr=self.learning_rate, weight_decay=self.weight_decay - ) - scheduler = torch.optim.lr_scheduler.StepLR( - optimizer, step_size=self.num_epoch // 3, gamma=0.7 - ) - min_test_loss = 1e10 - with tempfile.TemporaryDirectory() as tmp_dir: - model_cache_path = os.path.join(tmp_dir, "best_model.pth") - for epoch in range(self.num_epoch): - logger.info("Epoch: %d", epoch) - # training - self.model.train() - train_loss = None - for batch, (segment_sizes, features, gt_results) in tqdm(enumerate(train_loader)): - optimizer.zero_grad() - segment_sizes, features, gt_results = ( - segment_sizes.to(self.device), - features.to(self.device), - gt_results.to(self.device), - ) - pred_results = self.model(segment_sizes, features) - loss = self.model.lambda_rank_loss(pred_results, gt_results) - loss.backward() - torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.grad_clip) - optimizer.step() - loss = loss.detach().cpu() - train_loss = ( - train_loss * 0.95 + loss.item() * 0.05 - if train_loss is not None - else loss.item() - ) - segment_sizes, features, gt_results, pred_results = ( - segment_sizes.detach().cpu(), - features.detach().cpu(), - gt_results.detach().cpu(), - pred_results.detach().cpu(), - ) - if batch % self.train_verbose == 0: - logger.info("Batch: %d, train loss: %6f", batch, train_loss) - del pred_results - del loss - scheduler.step() - # testing - if epoch % self.test_interval == 0: - self.model.eval() - test_scores, test_losses = [], [] - for batch, (segment_sizes, features, gt_results) in tqdm(enumerate(test_loader)): - segment_sizes, features = ( - segment_sizes.to(self.device), - features.to(self.device), - ) - pred_results = self.model(segment_sizes, features) - segment_sizes, features, pred_results = ( - segment_sizes.detach().cpu(), - features.detach().cpu(), - pred_results.detach().cpu(), - ) - test_losses.append(self.model.lambda_rank_loss(pred_results, gt_results).item()) - scores = [] - for k in [1, 5, 10]: - scores.append(self.model.topk_score(pred_results, gt_results, k)) - test_scores.append(scores) - del pred_results - test_loss = ( - np.array(test_losses[:-1]).mean() if len(test_losses) > 1 else test_losses[0] - ) - logger.info( - "Average test loss: %6f, top1 score: %5f, top5 score: %5f, top10 score: %5f", - test_loss, - np.array(test_scores)[:, 0].mean(), - np.array(test_scores)[:, 1].mean(), - np.array(test_scores)[:, 2].mean(), - ) - if test_loss < min_test_loss: - min_test_loss = test_loss - torch.save(self.model.state_dict(), model_cache_path) - self.model.to("cpu").load_state_dict(torch.load(model_cache_path)) - - def _validate(self, features: List[np.ndarray], gt_results: np.ndarray) -> Dict[str, float]: - """Run validation without a test dataset. - - Parameters - ---------- - features : List[np.ndarray] - The features - gt_results : np.ndarray - The measured results - - Returns - ------- - result : Dict[str, float] - The validation result. - """ - segment_sizes = torch.tensor([len(feature) for feature in features]).to(self.device) - features = torch.cat([torch.tensor(feature) for feature in features]).to(self.device) - norm = features.max(dim=0)[0] - norm[norm == 0] = 1 - features /= norm - gt_results = torch.tensor(gt_results) - # begin validating - self.model = self.model.to(self.device) - self.model.eval() - pred_results = self.model(segment_sizes, features) - segment_sizes, features, pred_results = ( - segment_sizes.detach().cpu(), - features.detach().cpu(), - pred_results.detach().cpu(), - ) - loss = self.model.lambda_rank_loss(pred_results, gt_results).item() - scores = [] - for k in [1, 5, 10]: - scores.append(self.model.topk_score(pred_results, gt_results, k)) - return { - "loss": loss, - "top1_score": scores[0], - "top5_score": scores[1], - "top10_score": scores[2], - } + assert self.trainer is not None + # don't need to add new data into self.data for now + features = self.preprocessor.extract_features(candidates, context) + return self.trainer.validate_incremental(features)