-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtetracorder.py
282 lines (235 loc) · 12.6 KB
/
tetracorder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
# Philip G. Brodrick
import argparse
import yaml
import json
import numpy as np
import os
import logging
from typing import List
from collections import OrderedDict
import emit_utils.common_logs
import re
import pandas as pd
DEFAULT_GROUPS = [1,2]
DEFAULT_LOGFILE = None
DEFAULT_LOGLEVEL = 'INFO'
DEFAULT_SORT_KEYS = False
def recast_globals(to_recast: List, globals: dict):
for _i in range(len(to_recast)):
for key, item in globals.items():
to_recast[_i] = to_recast[_i].replace(key, item)
return to_recast
def decode_expert_system(tetra_expert_file, groups=DEFAULT_GROUPS, log_file=DEFAULT_LOGFILE,
log_level=DEFAULT_LOGLEVEL):
""" Convert a tetracorder .txt 'expert system file' into a dictionary. Not all
parameters are preserved, only those necessary for EMIT calculation.
Args:
tetra_expert_file: tetracorder expert system file to read
groups: tetracorder groups to use (all others will be ignored)
log_file: file to write logging to
log_level: logging level to use
Returns:
Coverted dictionary of tetracorder file
"""
if log_file is None:
logging.basicConfig(format='%(message)s', level=log_level)
else:
logging.basicConfig(format='%(message)s', level=log_level, filename=log_file)
emit_utils.common_logs.logtime()
decoded_expert = OrderedDict()
# read expert system file and strip comments
with open(tetra_expert_file, 'r') as fin:
expert_file_commented = fin.readlines()
expert_file_text, orig_lineno = [], []
for line_index, line in enumerate(expert_file_commented):
if (not line.strip().startswith('\#') or 'TITLE=' in line) and len(line.strip()) > 0:
orig_lineno.append(line_index)
expert_file_text.append(line)
del expert_file_commented
# Go through expert system file one line at a time, after initializing key variables
expert_line_index, group, spectrum, output_data, header, out_hdr, rows, cols = \
0, None, None, None, True, None, 0, 0
constituent_constraints = {}
globals = {
'[GLBLFITALL]': '[GLBLFITALL]',
'[GLBLFDFIT]': '[GLBLFDFIT]',
'[GLBLDPFIT]': '[GLBLDPFIT]',
'[GLBLDPFITg2]': '[GLBLDPFITg2]',
'[G2UMRBDA]': '[G2UMRBDA]',
}
groupnames = {}
casenames = {}
while expert_line_index < len(expert_file_text):
# The Header flag excludes the definitions at the start
if expert_file_text[expert_line_index].startswith('BEGIN SETUP'):
header = False
elif header:
# Check for globals
if expert_file_text[expert_line_index].startswith('=='):
for key in globals.keys():
if key in expert_file_text[expert_line_index]:
line_remainder = expert_file_text[expert_line_index][len(key)+2].strip()
if '\#' in line_remainder:
line_remainder = line_remainder[:line_remainder.index('\#')]
globals[key] = line_remainder
elif expert_file_text[expert_line_index].startswith('groupname'):
split_line = expert_file_text[expert_line_index].strip().split()
group_str = expert_file_text[expert_line_index][len('groupname ' + split_line[1])+1:expert_file_text[expert_line_index].index('\#')]
group_str = group_str.replace('region','').strip()
group_str = group_str.replace(' broad','-broad').strip()
group_str = group_str.replace(' curve','_curve').strip()
group_str = group_str.replace('2-2.5um','2um').strip()
groupnames[split_line[1]] = f'group.{group_str}'
elif expert_file_text[expert_line_index].startswith('casename'):
split_line = expert_file_text[expert_line_index].strip().split()
casenames[split_line[1]] = expert_file_text[expert_line_index][len('casename ' + split_line[1]):expert_file_text[expert_line_index].index('\#')]
expert_line_index = expert_line_index + 1
continue
# if we've gotten to the end of the record, time to pull everything together and write our output
if expert_file_text[expert_line_index].startswith('endaction'):
if group in groups:
# Populate the entry
entry = {}
constituent_constraints = {}
entry['longname'] = longname
entry['group'] = group
entry['groupname'] = groupnames[str(group)]
entry['record'] = record
entry['spectral_library'] = source_lib
entry['name'] = name
entry['data_type_scaling'] = data_type_scaling
entry['features'] = features
entry['constituent_constraints'] = constituent_constraints
entry['use'] = use
#decoded_expert[os.path.join(group, tetra_filename)] = entry
decoded_expert[os.path.join(entry['groupname'],tetra_filename)] = entry
if 'TITLE=' in expert_file_text[expert_line_index]:
toks = expert_file_text[expert_line_index].strip().split()
name = toks[1].strip().split('=')[1]
longname = name
for _t in range(2, len(toks)):
longname += ' ' + toks[_t]
if 'use=' in expert_file_text[expert_line_index]:
use = expert_file_text[expert_line_index].split('use=')[1].split('\#')[0].strip()
# if keyword 'group' appears, define the current group name
if expert_file_text[expert_line_index].startswith('group'):
group = int(expert_file_text[expert_line_index].strip().split()[1])
# SMALL keyword tells us to find the library record number
if 'SMALL' in expert_file_text[expert_line_index]:
record = int(expert_file_text[expert_line_index].strip().split()[3])
source_lib = expert_file_text[expert_line_index].strip().split()[2].replace(']','').replace('[','')
# 'define output' keyword tells us to get the 8 DN 255 scaling factor
if 'define output' in expert_file_text[expert_line_index]:
line_offset = 0
for linehunt in range(100):
if 'endoutput' in expert_file_text[expert_line_index+linehunt]:
break
if expert_file_text[expert_line_index+linehunt][:2] == '\#':
line_offset +=1
tetra_filename = expert_file_text[expert_line_index+2+line_offset].strip().split()[0]
data_type_scaling = float(expert_file_text[expert_line_index+3+line_offset].strip().split()[4])
# 'define features' means we've found the location to get the critical feature elements:
# the requisite wavelengths for now. currently continuum removal threshold ct and lct/rct ignored
valid_feature_constraints = ['ct','lct','rct','lct/rct>','rct/lct>','rcbblc>','rcbblc<','lcbbrc>','lcbbrc<','r*bd>']
if 'define features' in expert_file_text[expert_line_index]:
expert_line_index += 1
features = []
while ('endfeatures' not in expert_file_text[expert_line_index]):
toks = expert_file_text[expert_line_index].strip().split()
if len(toks) > 5 and toks[0].startswith('f') and (toks[1] == 'DLw' or toks[1] == 'MLw' or toks[1] == 'OLw'):
local_feature = {}
local_feature['continuum'] = [float(f) for f in toks[2:6]]
local_feature['feature_type'] = toks[1]
last_valid = len(toks)
for _t in range(len(toks)-1, 5, -1):
if '\#' in toks[_t]:
last_valid = _t
elif toks[_t] in valid_feature_constraints:
local_feature[toks[_t]] = recast_globals([toks[_local_tok] for _local_tok in range(_t+1, last_valid)], globals)
last_valid = _t
features.append(local_feature)
expert_line_index += 1
valid_constituent_constraints = ['FIT', 'FITALL', 'DEPTH', 'DEPTHALL', 'DEPTH-FIT', 'FD', 'FDALL', 'FD-FIT', 'FD-DEPTH']
if 'define constraints' in expert_file_text[expert_line_index]:
expert_line_index += 1
while ('endconstraint' not in expert_file_text[expert_line_index]):
toks = expert_file_text[expert_line_index].strip().split()
if toks[0] == 'constraint:':
last_valid = len(toks)
for _t in range(len(toks)-1, 0, -1):
if '\#' in toks[_t]:
last_valid = _t
elif np.any([vcc in toks[_t] for vcc in valid_constituent_constraints]):
if '<' in toks[_t]:
logging.warning('less than found in constraints, revise expert reader')
constraint_values = toks[_t].strip().split('>')
constraint_name = constraint_values.pop(0)
for _local_tok in range(_t+1, last_valid):
constraint_values.append(toks[_local_tok])
constituent_constraints[constraint_name] = recast_globals(constraint_values, globals)
expert_line_index += 1
expert_line_index += 1
return decoded_expert
def read_mineral_fractions(file_list: List):
"""
Read in a series of mineral fractions files from tetracorder, converting to dictionaries
Args:
file_list: list of files to read
Returns:
Dictionary keyed with unique file identifiers corresponding to expert system file
"""
mineral_fractions = OrderedDict()
mineral_names = [re.split('\.|-', os.path.basename(x))[0] for x in file_list]
for _f, filename in enumerate(file_list):
with open(filename, 'r') as fin:
fractions_file_commented = fin.readlines()
df = None
for _line, line in enumerate(fractions_file_commented):
if not line.strip().startswith('#'):
df = pd.read_fwf(filename, skiprows=_line, header=None)
df = df.loc[:,:6]
df = df.dropna()
break
## Hard coded due to inconsistent multi-line abundance file heading
header = ['file','DN_scale','BD_factor','Band_depth', 'title','spectral_library','record']
fraction_list = []
if df is None:
continue
else:
for idx in range(len(df[0])):
local_entry = {}
for headname, tok in zip(header, df.iloc[idx,:len(header)].tolist()):
if headname == 'spectral_library':
local_entry[headname] = tok.split('conv')[0]
else:
local_entry[headname] = tok
if type(local_entry[header[0]]) == float and np.isnan(local_entry[header[0]]):
continue
fraction_list.append(local_entry)
mineral_fractions[mineral_names[_f]] = fraction_list
return mineral_fractions
def main():
parser = argparse.ArgumentParser(description="Translate to Rrs. and/or apply masks")
parser.add_argument('tetra_expert_file', type=str, metavar='TETRA_EXPERT_SYSTEM')
parser.add_argument('converted_file', type=str, metavar='output_converted_tetrafile')
parser.add_argument('-groups', type=int, default=DEFAULT_GROUPS, nargs='*')
parser.add_argument('-sort_keys', type=int, default=int(DEFAULT_SORT_KEYS), choices=[0,1])
parser.add_argument('-log_file', type=str, default=DEFAULT_LOGLEVEL)
parser.add_argument('-log_level', type=str, default=DEFAULT_LOGLEVEL)
args = parser.parse_args()
args.sort_keys = args.sort_keys == 1
emit_utils.common_logs.logtime()
if args.log_file is None:
logging.basicConfig(format='%(message)s', level=args.log_level)
else:
logging.basicConfig(format='%(message)s', level=args.log_level, filename=args.log_file)
decoded_expert = decode_expert_system(args.tetra_expert_file, args.groups, args.log_file, log_level=args.log_level)
with open(args.converted_file, 'w') as file:
if os.path.splitext(args.converted_file)[-1] == '.yaml':
outstring = yaml.dump(decoded_expert, sort_keys=args.sort_keys)
elif os.path.splitext(args.converted_file)[-1] == '.json':
outstring = json.dumps(decoded_expert, sort_keys=args.sort_keys, indent=4)
file.write(outstring)
emit_utils.common_logs.logtime()
if __name__ == "__main__":
main()