-
Notifications
You must be signed in to change notification settings - Fork 20
/
eval_biwi.py
executable file
·169 lines (138 loc) · 5.24 KB
/
eval_biwi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#############################################################################################################
# this file is for evaluation of the vocaset dataset
#############################################################################################################
import os
import pickle
import torch
import json
import os
from pathlib import Path
from omegaconf import OmegaConf
import numpy as np
from alm.config import parse_args
from alm.models.get_model import get_model
from alm.utils.logger import create_logger
from alm.utils.demo_utils import animate
from rich import get_console
from rich.table import Table
import pytorch_lightning as pl
from alm.data.get_data import get_datasets
from alm.callback import ProgressLogger
from tqdm import tqdm
# create a list namly vertices_pred_list, if it does not exist
try:
eval('vertices_pred_list')
eval('vertices_gt_list')
eval('motion_std_difference')
except NameError: # if it does not exist, create it
vertices_gt_all = []
vertices_pred_all = []
motion_std_difference = []
# load the mouth and upper face region
try:
eval('mouth_map')
except NameError:
with open(os.path.join('datasets/biwi/regions', "lve.txt")) as f:
maps = f.read().split(", ")
mouth_map = [int(i) for i in maps]
try:
eval('upper_map')
except NameError:
with open(os.path.join('datasets/biwi/regions', "fdd.txt")) as f:
maps = f.read().split(", ")
upper_map = [int(i) for i in maps]
def print_table(title, metrics):
table = Table(title=title)
table.add_column("Metrics", style="cyan", no_wrap=True)
table.add_column("Value", style="magenta")
for key, value in metrics.items():
table.add_row(key, str(value))
console = get_console()
console.print(table, justify="center")
def get_metric_statistics(values, replication_times):
mean = np.mean(values, axis=0)
std = np.std(values, axis=0)
conf_interval = 1.96 * std / np.sqrt(replication_times)
return mean, conf_interval
def main():
# parse options
cfg = parse_args(phase="test") #
cfg.FOLDER = cfg.TEST.FOLDER
cfg.Name = "demo--" + cfg.NAME
# set up the logger
dataset = 'vocaset' # TODO
logger = create_logger(cfg, phase="test")
output_dir = Path(
os.path.join(cfg.FOLDER, str(cfg.model.model_type), str(cfg.NAME),
"samples_" + cfg.TIME))
output_dir.mkdir(parents=True, exist_ok=True)
logger.info(OmegaConf.to_yaml(cfg))
# set seed
pl.seed_everything(cfg.SEED_VALUE)
# create dataset
datasets = get_datasets(cfg, logger=logger, phase="test")[0]
logger.info("datasets module {} initialized".format("".join(
cfg.TRAIN.DATASETS)))
# create model
model = get_model(cfg, datasets)
logger.info("model {} loaded".format(cfg.model.model_type))
# monitor
metric_monitor = {
'none': None # TODO
}
# callbacks
callbacks = [
pl.callbacks.RichProgressBar(), # type: ignore
ProgressLogger(metric_monitor=metric_monitor),
]
# trainer
trainer = pl.Trainer(
benchmark=False,
max_epochs=cfg.TRAIN.END_EPOCH,
accelerator=cfg.ACCELERATOR,
devices=list(range(len(cfg.DEVICE))),
default_root_dir=cfg.FOLDER_EXP,
reload_dataloaders_every_n_epochs=1,
log_every_n_steps=cfg.LOGGER.LOG_EVERY_STEPS,
deterministic=False,
detect_anomaly=False,
enable_progress_bar=True,
logger=None,
callbacks=callbacks,
)
# load model weights
logger.info("Loading checkpoints from {}".format(cfg.TEST.CHECKPOINTS))
state_dict = torch.load(cfg.TEST.CHECKPOINTS, map_location="cpu")["state_dict"]
state_dict.pop("denoiser.PPE.pe") # this is not needed, since the sequence length can be any flexiable
model.load_state_dict(state_dict, strict=False)
all_metrics = {}
replication_times = cfg.TEST.REPLICATION_TIMES
seeds = np.arange(0, replication_times * 10, replication_times)
# calculate metrics
for i, seed in zip(tqdm(range(replication_times), desc="Evaluation among replications"), seeds):
logger.info(f"Evaluation Replication {i}")
metrics = trainer.test(model, datamodule=datasets)[0]#[0] # TODO
# set seed
pl.seed_everything(seed)
# save metrics
for key, item in metrics.items():
if key not in all_metrics:
all_metrics[key] = [item]
else:
all_metrics[key] += [item]
# change the seed
all_metrics_new = {}
for key, item in all_metrics.items():
mean, conf_interval = get_metric_statistics(np.array(item),
replication_times)
all_metrics_new[key + "/mean"] = mean
all_metrics_new[key + "/conf_interval"] = conf_interval
print_table(f"Mean Metrics", all_metrics_new)
all_metrics_new.update(all_metrics)
# save metrics to file
metric_file = output_dir.parent / f"metrics_{cfg.TIME}.json"
with open(metric_file, "w", encoding="utf-8") as f:
json.dump(all_metrics_new, f, indent=4)
logger.info(f"Testing done, the metrics are saved to {str(metric_file)}")
if __name__ == "__main__":
main()