-
Notifications
You must be signed in to change notification settings - Fork 27
/
batch_execution
executable file
·232 lines (175 loc) · 9.39 KB
/
batch_execution
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
#!/usr/bin/env python3
"""
Runs several calibration executions in batch_execution. A yml file is used to config the batch executions.
"""
import argparse
from copy import deepcopy
import os
from os.path import isfile
import shutil
import subprocess
import yaml
import jinja2
from jinja2 import Environment, FileSystemLoader
from sklearn.model_selection import StratifiedKFold, KFold, LeaveOneOut, StratifiedShuffleSplit
from colorama import Fore, Back, Style
from atom_core.config_io import uriReader
from atom_core.dataset_io import loadJSONFile,filterCollectionsFromDataset
from atom_core.system import resolvePath, execute
from atom_core.utilities import atomWarn
from atom_core.naming import generateCollectionKey
from atom_batch_execution.folder_io import stripAutomaticSuffixes
def bprint(text):
"""bprint (batch print) will always print in blue color with yellow background """
print(Fore.BLUE + Back.YELLOW + text + Style.RESET_ALL)
def generateClasses(dataset):
"""
Generate classes based on an ATOM dataset.
Classes follow the format detected_pattern--detected_sensor1-detected_sensor2-[...]---[...]
Args:
dataset (dict): ATOM dataset.
Returns:
tuple: A tuple containing the classes and collection keys.
"""
classes = []
collection_keys = list(dataset['collections'].keys())
for collection_key in collection_keys:
detected_sensors = []
class_name = ''
for pattern_key in dataset['patterns'].keys():
for sensor_key in dataset['sensors'].keys():
if dataset['collections'][collection_key]['labels'][pattern_key][sensor_key]['detected']:
detected_sensors.append(sensor_key)
detected_pattern_and_sensors = pattern_key + '--' + '-'.join(detected_sensors)
class_name += detected_pattern_and_sensors + '---'
classes.append(class_name.rstrip('---'))
return classes, collection_keys
def main():
ap = argparse.ArgumentParser() # Parse command line arguments
ap.add_argument("-v", "--verbose", help="Prints the stdout_data of each command to the terminal.",
action='store_true', default=False)
ap.add_argument("-tf", "--template_filename", help="Jinja2 yaml containing the batches.",
required=True, type=str)
ap.add_argument("-df", "--data_filename", help="Yaml containing variables used in the template file.",
required=True, type=str)
ap.add_argument("-of", "--output_folder", help="Folder where to store the results",
required=True, type=str)
ap.add_argument("-ow", "--overwrite", help="Overwrite output folder if needed.",
required=False, action='store_true')
ap.add_argument("-dr", "--dry_run", help="Run without actually executing the processes.",
required=False, action='store_true')
ap.add_argument("-rs", "--run_suffix", help="Suffix used to signal multiple runs of the same experiment.",
required=False, default='_run', type=str)
ap.add_argument("-fs", "--fold_suffix", help="Suffix used to signal multiple folds of the same run.",
required=False, default='_fold', type=str)
args = vars(ap.parse_args())
# Load data.yml
with open(args['data_filename']) as f:
file_content = f.read()
data = yaml.safe_load(file_content)
dataset = loadJSONFile(data['dataset_path'])
collections_to_remove = data['collections_to_remove']
args["collection_selection_function"] = lambda x : int(x) not in collections_to_remove
args['use_incomplete_collections'] = None
args['remove_partial_detections'] = None
filterCollectionsFromDataset(dataset,args)
# Generate classes and collection keys
classes, collection_keys = generateClasses(dataset)
# Safeguard to guarantee backwards compatibility
if 'cross_validation' not in data:
data['cross_validation'] = {'type': None, 'n_splits': None}
# Process cross validation data
if data['cross_validation']['type'] == 'stratified-k-fold':
# Use StratifiedKFold for cross validation
cross_validator = StratifiedKFold(n_splits=data['cross_validation']['n_splits'], shuffle=True)
# Split the data into folds
folds = cross_validator.split(collection_keys, classes)
elif data['cross_validation']['type'] == 'k-fold':
# Use KFold for cross validation
cross_validator = KFold(n_splits=data['cross_validation']['n_splits'], shuffle=True)
# Split the data into folds
folds = cross_validator.split(collection_keys)
elif data['cross_validation']['type'] == 'leave-one-out':
# Use LeaveOneOut for cross validation
cross_validator = LeaveOneOut()
# Split the data into folds
folds = cross_validator.split(collection_keys)
elif data['cross_validation']['type'] == 'stratified-shuffle-split' and data['cross_validation']['train_size']:
# Use StratifiedShuffleSplit for cross validation
cross_validator = StratifiedShuffleSplit(n_splits=data['cross_validation']['n_splits'], train_size=data['cross_validation']['train_size'])
# Split the data into folds
folds = cross_validator.split(collection_keys, classes)
else:
# Print error message if the cross validation type is not supported
folds = [(range(len(collection_keys)), range(len(collection_keys)))]
bprint('Running without any cross validation.')
# Converting folds to a list, in order to iterate over it various times
folds = list(folds)
# Transforming indexes of collections into collection keys
fold_list = [[[int(collection_keys[element]) for element in split] for split in fold] for fold in folds]
# Dataset is no longer needed
del dataset
# Add dataset dirname to data
dataset_dirname = os.path.dirname(data["dataset_path"])
data["dataset_dirname"] = dataset_dirname
# Add folds to data
data['folds'] = fold_list
# Add dataset dirname to data
dataset_dirname = os.path.dirname(data["dataset_path"])
data["dataset_dirname"] = dataset_dirname
# Template engine1 setup
file_loader = FileSystemLoader(os.path.dirname(args['template_filename']))
env = Environment(loader=file_loader, undefined=jinja2.StrictUndefined)
env.add_extension('jinja2.ext.do')
template = env.get_template(os.path.basename(args['template_filename']))
# Print the rendered jinja file just for debug
rendered = template.render(data)
with open('auto_rendered.yaml', 'w') as file:
file.write(rendered)
config = yaml.safe_load(rendered)
# Create output folder
args['output_folder'] = resolvePath(args['output_folder'])
if not os.path.exists(args['output_folder']): # create stdout_data folder if it does not exist.
os.mkdir(args['output_folder']) # Create the new folder
elif os.path.exists(args['output_folder']) and args['overwrite']:
shutil.rmtree(args['output_folder']) # Create the new folder
os.mkdir(args['output_folder']) # Create the new folder
# Run preprocessing
print('\n')
bprint('Executing preprocessing command:\n' + config['preprocessing']['cmd'])
execute(config['preprocessing']['cmd'], verbose=args['verbose'], save_path=args['output_folder'], save_filename_additions='preprocessing_')
# Run experiments
num_experiments = len(config['experiments'].keys())
for idx, (experiment_key, experiment) in enumerate(config['experiments'].items()):
bprint(Style.BRIGHT + 'Experiment ' + str(idx) + ' of ' + str(num_experiments-1) + ': ' + experiment_key)
bprint('Executing command:\n' + experiment['cmd'])
if args['dry_run']:
bprint('Running in dry run mode...')
continue
experiment_folder = args['output_folder'] + '/' + experiment_key
if os.path.exists(experiment_folder) and not args['overwrite']:
atomWarn('Folder ' + experiment_folder + ' exists. Skipping batch experiment.')
continue
else:
os.mkdir(experiment_folder)
# Start executing command.
execute(experiment['cmd'], verbose=args['verbose'], save_path=experiment_folder)
# Save experiment settings
for experiment_type in data['experiments']:
if experiment_type["name"] == stripAutomaticSuffixes(experiment_key,args):
print(f"Saving settings file for {Fore.BLUE}{experiment_type['name']}{Style.RESET_ALL}")
settings_file_path = f"{experiment_folder}/{experiment_type['name']}_settings.yml"
yaml.dump(experiment_type, open(settings_file_path, 'w'), sort_keys=False)
# Collect stdout_data files
for file in experiment['files_to_collect']:
if file is None:
raise ValueError('File in files to collect is None. Aborting.')
resolved_file, _, _ = uriReader(file)
if not os.path.exists(resolved_file):
raise ValueError('File ' + file + ', resolved to ' + resolved_file +
' should be collected but does not exist.')
filename_out = experiment_folder + '/' + os.path.basename(resolved_file)
print(Fore.BLUE + Back.YELLOW + 'Copying file ' + resolved_file + ' to ' + filename_out + Style.RESET_ALL)
execute('cp ' + resolved_file + ' ' + filename_out, verbose=False)
if __name__ == "__main__":
main()