-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkflow_docker.py
840 lines (707 loc) · 34.6 KB
/
workflow_docker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
#!/usr/bin/env python3
"""Docker workflow runner"""
import os
import io
import json
import time
import sys
import subprocess
from typing import Optional
from threading import Event, Thread
from collections.abc import Callable
import logging
#DOCKER_IMAGE = 'agdrone/drone-workflow:1.1'
DOCKER_IMAGE = 'chrisatua/development:drone_makeflow'
# Maximum lines of output that's cached before being written to disk
MAX_CACHED_OUTPUT_LINES = 40
# Definitions for waiting on command outputs
MAX_READER_WAIT_LOOP = 200
MAX_READER_WAIT_SEC = 0.1
def get_command_map() -> dict:
"""Returns the mapping of commands to functions
Return:
A dictionary of command names and their handler
"""
current_module = sys.modules[__name__]
return {
'soilmask': current_module.handle_soilmask,
'soilmask_ratio': current_module.handle_soilmask_ratio,
'plotclip': current_module.handle_plotclip,
'find_files2json': current_module.handle_find_files2json,
'canopycover': current_module.handle_canopycover,
'greenness_indices': current_module.handle_greenness_indices,
'merge_csv': current_module.handle_merge_csv,
'git': current_module.handle_git_repo
}
def _load_json_file(filename: str, error_func: Callable=None) -> Optional[object]:
"""Handles loading a JSON file
Arguments:
filename: the path to the JSON file
error_func: optional function to write errors to
Return:
Returns the contents of the loaded JSON file
"""
result = None
try:
with open(filename, 'r', encoding='utf8') as in_file:
result = json.load(in_file)
except json.JSONDecodeError as ex:
msg = 'A JSON decode error was caught while loading JSON file "%s"' % filename
logging.exception(msg)
if error_func:
error_func((msg, str(ex)))
except Exception as ex:
msg = 'An unknown exception was caught while loading JSON file "%s"' % filename
logging.exception(msg)
if error_func:
error_func((msg, str(ex)))
return result
def _find_parameter_values(parameters: list, field_names: tuple) -> tuple:
"""Returns a tuple of found parameter values associated with the field names
Arguments:
parameters: the list of parameters to search
field_names: the ordered tuple of field names
Return:
A tuple containing found field values associated with the specified field names, in the same order of the field names.
If a field name is not found, None is returned in its place.
"""
found = {}
for one_parameter in parameters:
found[one_parameter['field_name']] = one_parameter['value']
return (found.get(one_name, None) for one_name in field_names)
def _replace_folder_path(path: str, from_folder: str, to_folder: str) -> Optional[str]:
"""Changes the path from the source ('from') folder to the destination ('to') folder
Arguments:
path: the path to adjust
from_folder: the folder to change from
to_folder: the folder to change the path to
Return:
A copy of the path with the folder changed when 'path' starts with 'from_folder', othwerwise
None is returned
Notes:
Only fully qualified partial paths are considered valid. Thus, '/a/b/c' is NOT considered the start of path '/a/b/concord', but
is the considered the start of '/a/b/c' and '/a/b/c/dogs.csv'
"""
# Make sure we have a legitimate 'from' path
if not path.startswith(from_folder):
logging.debug('Replace folder path: original path "%s" doesn\'t start with expected folder "%s"', path, from_folder)
return None
check_idx = len(from_folder)
if from_folder[-1:] == '/' or from_folder[-1:] == '\\':
check_idx -= 1
if not path[check_idx] =='/' and not path[check_idx] =='\\':
return None
# Return the new path
rem = path[len(from_folder):]
if rem[0] == '/' or rem[0] == '\\':
rem = rem[1:]
return os.path.join(to_folder, rem)
def _consume_output(reader: io.BufferedReader, output_func: Callable, done_event: Event):
"""Consumes the output from reader and writes it to the file
Arguments:
reader: object to read from
output_func: the function to write to
done_event: the event to set when we're done
"""
if reader is None:
return
lines = []
while True:
try:
line = reader.readline()
if line:
if isinstance(line, bytes):
line = line.decode('UTF-8')
logging.debug(line.rstrip('\n'))
lines.append(line)
if len(lines) >= MAX_CACHED_OUTPUT_LINES:
output_func(lines, True)
lines = []
else:
break
except Exception:
logging.exception("Ignoring exception while waiting on messages")
if lines:
output_func(lines, True)
done_event.set()
def _write_command_json(json_file_path: str, json_args: object):
"""Writes the passed in object the specific file
Arguments:
json_file_path: the file to write to
json_args: the json to write to the file
Exceptions:
Raises RunttimeError if a problem occurs when writing out the JSON
"""
with open(json_file_path, 'wt', encoding='utf8') as out_file:
try:
out_file.write(json.dumps(json_args, indent=2))
except json.JSONDecodeError as ex:
msg = 'JSON exception caught while writing command arguments to "%s"' % json_file_path
logging.exception(msg)
raise RuntimeError(msg) from ex
except OSError as ex:
msg = 'OS exception caught while writing command arguments to "%s"' % json_file_path
logging.exception(msg)
raise RuntimeError(msg) from ex
except Exception as ex:
msg = 'Unknown exception caught while writing command arguments to "%s"' % json_file_path
logging.exception(msg)
raise RuntimeError(msg) from ex
def _run_command(command: str, input_folder: str, output_folder: str, json_file_path: str, msg_func: Callable, err_func: Callable,
additional_mounts: tuple=None):
"""Handles the details of executing the docker image command
Arguments:
command: the command string to run
input_folder: the folder containing the command input
output_folder: the folder containing the command output
json_file_path: the JSON file to pass to the command
msg_func: function to write messages to
err_func: function to write errors to
additional_copy: optional tuple of additional mount commands for the docker command; one or more [source_path, mount_point] pairs;
source files are copied before the command is run and folders are created as needed
"""
run_command = ['docker',
'run',
'--rm',
'-v',
input_folder + ':/input',
'-v',
output_folder + ':/output',
'-v',
json_file_path + ':/scif/apps/src/jx-args.json'
]
if additional_mounts is not None:
for one_mount in additional_mounts:
if len(one_mount) == 2:
run_command.append('-v')
run_command.append(one_mount[0] + ':' + one_mount[1])
else:
msg1 = 'Warning: bad additional mount specified: %s' % str(one_mount)
msg2 = ' should consist of a [source path, mount path] pair'
logging.warning(msg1)
logging.warning(msg2)
msg_func((msg1, msg2), True)
run_command.extend([DOCKER_IMAGE, 'run', command])
logging.debug("Running command: %s", run_command)
# pylint: disable=consider-using-with
proc = subprocess.Popen(run_command, bufsize=-1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return_value = -1
if proc:
msg_event = Event()
err_event = Event()
msg_event.clear()
err_event.clear()
msg_thread = Thread(target=_consume_output, args=(proc.stdout, msg_func, msg_event), daemon=True)
err_thread = Thread(target=_consume_output, args=(proc.stderr, err_func, err_event), daemon=True)
msg_thread.start()
err_thread.start()
# Loop here processing the output until the proc finishes
logging.debug('Waiting for process to finish')
while proc.returncode is None:
proc.poll()
# Sleep and try again for process to complete
time.sleep(1)
logging.debug("Return code: %s", str(proc.returncode))
return_value = proc.returncode
# Wait for the rest of the command's ouput to be read
logging.debug("Checking on readers")
cur_wait_counter = 0
displayed_message = False
skipped_messages = 0
while (msg_event.wait(1) is False or err_event.wait(1) is False) and cur_wait_counter < MAX_READER_WAIT_LOOP:
if displayed_message is False or skipped_messages % 10 == 0:
logging.debug("Sleeping on readers: %d", cur_wait_counter)
displayed_message = True
skipped_messages = 0
time.sleep(MAX_READER_WAIT_SEC)
cur_wait_counter += 1
skipped_messages += 1
if msg_event.wait(1) is False or err_event.wait(1) is False:
logging.error('Unable to retrieve messages and/or errors for command: %s', command)
logging.warning('Ignoring problems with fetching output for command: %s', command)
return return_value
def _get_results_json(working_folder: str, error_func: Callable=None, recursive: bool=False) -> Optional[object]:
""" Loads and returns the json resulting from running the workflow
Arguments:
working_folder: the folder the results are stored in
error_func: the function to write errors to
recursive: will recurse into subfolders when True. Otherwise only working_folder is checked
Returns:
The contents of the results file when not recursive. When recursive a list of all the results is returned
"""
# pylint: disable=too-many-nested-blocks,too-many-branches
res = {}
results_path = os.path.join(working_folder, 'result.json')
if os.path.exists(results_path):
res = _load_json_file(results_path, error_func)
if 'file' in res:
mapped_files = []
for one_file in res['file']:
if 'path' in one_file:
one_file['path'] = _replace_folder_path(one_file['path'], '/output', working_folder)
mapped_files.append(one_file)
res['file'] = mapped_files
if 'container' in res:
mapped_container = []
for one_entry in res['container']:
if 'file' in one_entry:
mapped_files = []
for one_file in one_entry['file']:
if 'path' in one_file:
one_file['path'] = _replace_folder_path(one_file['path'], '/output', working_folder)
mapped_files.append(one_file)
one_entry['file'] = mapped_files
mapped_container.append(one_entry)
res['container'] = mapped_container
if recursive is True:
res = [res] if res else []
for one_file in os.listdir(working_folder):
cur_path = os.path.join(working_folder, one_file)
if os.path.isdir(cur_path):
new_results = _get_results_json(cur_path, error_func, recursive)
for one_result in new_results:
if one_result:
res.append(one_result)
return res
def _repoint_files_json_dir(filename: str, source_folder: str, target_folder: str, working_folder: str) -> Optional[str]:
""" Repoints the DIR entry in the JSON file to the target folder
Arguments:
filename: the file to load and process
source_folder: the source folder to replace with target folder; if empty or None, a best guess is applied
target_folder: the target folder for the DIR entries
working_folder: the working folder to place the updated file in
Return:
The name of the adjusted JSON file when successful. Otherwise the None is returned
Notes:
The new file will be have the same name as the original, but will be in the working folder. If a file by that name
already exists in the working folder, it will be overwritten.
"""
# Check parameters
if not os.path.isfile(filename):
msg = 'Invalid file specified to repoint files JSON "%s"' % filename
logging.warning(msg)
return None
if not os.path.isdir(working_folder):
msg = 'Invalid working folder specified to repoint files JSON "%s"' % working_folder
logging.warning(msg)
return None
# Load the JSON
file_json = _load_json_file(filename)
if file_json is None:
msg = 'Unable to load JSON file when repointing files JSON "%s"' % filename
logging.warning(msg)
return None
if not isinstance(file_json, dict):
msg = 'Unknown JSON format when repointing files JSON "%s"' % filename
logging.warning(msg)
return None
if 'FILE_LIST' not in file_json:
msg = 'JSON missing FILE_LIST key when repointing files JSON "%s"' % filename
logging.warning(msg)
return None
new_file = os.path.join(working_folder, os.path.basename(filename))
all_files = file_json['FILE_LIST']
if not isinstance(all_files, list) and not isinstance(all_files, tuple) and not isinstance(all_files, set):
msg = 'FILE_LIST value is not a list of files for repointing files JSON "%s"' % filename
logging.warning(msg)
return None
try:
# Make sure we have a source folder to work with
if not source_folder:
cur_path = all_files[0]['DIR']
if cur_path[-1:] =='/' or cur_path[-1:] =='\\':
cur_path = cur_path[:len(cur_path) - 1]
source_folder = os.path.dirname(cur_path)
# Run through the files that we have
new_files = []
for one_file in all_files:
cur_file = {**one_file}
if cur_file['DIR'].startswith(source_folder):
cur_file['DIR'] = _replace_folder_path(cur_file['DIR'], source_folder, target_folder)
new_files.append(cur_file)
with open(new_file, 'w', encoding='utf8') as out_file:
json.dump({"FILE_LIST": new_files}, out_file, indent=2)
except Exception:
msg = 'Exception caught while repointing files JSON: "%s"' % filename
logging.exception(msg)
new_file = None
return new_file
def _handle_missing_parameters(process_name: str, parameters: tuple, parameter_names: tuple) -> None:
"""Common missing parameter handler
Arguments:
process_name: the name to use for any messages
params: a tuple of values to check
parameter_names: the names of the prameters for any messages
Exceptions:
RuntimeError is raised if any of the parameters are None
"""
missing_parameters = []
parameter_name_len = len(parameter_names)
for idx, value in enumerate(parameters):
if value is None:
missing_parameters.append(parameter_names[idx] if idx < parameter_name_len else 'Unknown_%d' % idx)
if missing_parameters:
msg = 'Missing required parameter(s) "' + '","'.join(missing_parameters) + '"" for ' + process_name
logging.error(msg)
raise RuntimeError(msg)
def _handle_missing_files(process_name: str, parameters: tuple, parameter_names: tuple) -> None:
"""Common missing file handler
Arguments:
process_name: the name to use for any messages
params: a tuple of values to check
parameter_names: the names of the prameters for any messages
Exceptions:
RuntimeError is raised if any of the parameters are None
"""
invalid_parameters = []
invalid_values = []
parameter_name_len = len(parameter_names)
for idx, path in enumerate(parameters):
if not os.path.exists(path) or not os.path.isfile(path):
invalid_parameters.append(parameter_names[idx] if idx < parameter_name_len else 'Unknown_%d' % idx)
invalid_values.append(path)
if invalid_parameters:
msg = 'Required files "' + ('","'.join(invalid_parameters)) + '" for ' + process_name + \
' are missing or are not files: "' + ('","'.join(invalid_values)) + '"'
logging.error(msg,)
raise RuntimeError(msg)
def _handle_missing_folders(process_name: str, parameters: tuple, parameter_names: tuple) -> None:
"""Common missing file handler
Arguments:
process_name: the name to use for any messages
params: a tuple of values to check
parameter_names: the names of the prameters for any messages
Exceptions:
RuntimeError is raised if any of the parameters are None
"""
invalid_parameters = []
invalid_values = []
parameter_name_len = len(parameter_names)
for idx, path in enumerate(parameters):
if not os.path.exists(path) or not os.path.isdir(path):
invalid_parameters.append(parameter_names[idx] if idx < parameter_name_len else 'Unknown_%d' % idx)
invalid_values.append(path)
if invalid_parameters:
msg = 'Required folders "' + ('","'.join(invalid_parameters)) + '" for ' + process_name + \
' are missing or are not folders: "' + ('","'.join(invalid_values)) + '"'
logging.error(msg,)
raise RuntimeError(msg)
def handle_soilmask(parameters: tuple, input_folder: str, working_folder: str, msg_func: Callable, err_func: Callable) -> Optional[dict]:
"""Handle running the soilmask algorithm
Arguments:
parameters: the specified parameters for the algorithm
input_folder: the base folder where input files are located
working_folder: the working folder for the algorithm
msg_func: function to write messages to
err_func: function to write errors to
Return:
A dictionary of addittional parameters to pass to the next command or None
"""
# Find our arguments
image_path, options = _find_parameter_values(parameters, ('image', 'options'))
# Ensure we have our mandatory parameters
_handle_missing_parameters('soilmask', (image_path,), ('image',))
_handle_missing_files('soilmask', (image_path,), ('image',))
# Get the output masked file name
cur_filename, cur_ext = os.path.splitext(os.path.basename(image_path))
mask_filename = cur_filename + '_mask' + cur_ext
# Write the JSON arguments to disk for the command
json_args = {
'SOILMASK_SOURCE_FILE': _replace_folder_path(image_path, input_folder, '/input'),
'SOILMASK_MASK_FILE': mask_filename,
'SOILMASK_WORKING_FOLDER': '/output',
'SOILMASK_OPTIONS': options if options is not None else '',
}
json_file_path = os.path.join(working_folder, 'args.json')
_write_command_json(json_file_path, json_args)
logging.debug("Command JSON: %s", str(json_args))
# Run the command
ret_value = _run_command('soilmask', input_folder, working_folder, json_file_path, msg_func, err_func)
command_results = None
if ret_value == 0:
command_results = _get_results_json(working_folder, err_func)
return command_results
def handle_soilmask_ratio(parameters: tuple, input_folder: str, working_folder: str, msg_func: Callable, err_func: Callable) -> \
Optional[dict]:
"""Handle running the soilmask by ratio algorithm
Arguments:
parameters: the specified parameters for the algorithm
input_folder: the base folder where input files are located
working_folder: the working folder for the algorithm
msg_func: function to write messages to
err_func: function to write errors to
Return:
A dictionary of addittional parameters to pass to the next command or None
"""
# Find our arguments
image_path, ratio, options = _find_parameter_values(parameters, ('image', '', 'options'))
# Ensure we have our mandatory parameters
_handle_missing_parameters('soilmask ratio', (image_path,), ('image',))
_handle_missing_files('soilmask ratio', (image_path,), ('image',))
if options is None:
options = ''
# Determine if we have a ratio and default it if not
if ratio is None:
ratio = 1.0
options += ' --ratio ' + str(ratio)
# Get the output masked file name
cur_filename, cur_ext = os.path.splitext(os.path.basename(image_path))
mask_filename = cur_filename + '_mask' + cur_ext
# Write the JSON arguments to disk for the command
json_args = {
'SOILMASK_RATIO_SOURCE_FILE': _replace_folder_path(image_path, input_folder, '/input'),
'SOILMASK_RATIO_MASK_FILE': mask_filename,
'SOILMASK_RATIO_WORKING_FOLDER': '/output',
'SOILMASK_RATIO_OPTIONS': options if options is not None else '',
}
json_file_path = os.path.join(working_folder, 'args.json')
_write_command_json(json_file_path, json_args)
logging.debug("Command JSON: %s", str(json_args))
# Run the command
ret_value = _run_command('soilmask_ratio', input_folder, working_folder, json_file_path, msg_func, err_func)
command_results = None
if ret_value == 0:
command_results = _get_results_json(working_folder, err_func)
return command_results
def handle_plotclip(parameters: tuple, input_folder: str, working_folder: str, msg_func: Callable, err_func: Callable) -> Optional[dict]:
"""Handle running the plotclip algorithm
Arguments:
parameters: the specified parameters for the algorithm
input_folder: the base folder where input files are located
working_folder: the working folder for the algorithm
msg_func: function to write messages to
err_func: function to write errors to
Return:
A dictionary of addittional parameters to pass to the next command or None
"""
image_path, plot_geometries, options = _find_parameter_values(parameters, ('image','geometries','options'))
# Ensure we have our mandatory parameters
_handle_missing_parameters('plotclip', (image_path, plot_geometries), ('image', 'plot_geometries'))
_handle_missing_files('plotclip', (image_path, plot_geometries), ('image', 'plot_geometries'))
# Write the arguments
json_args = {
'PLOTCLIP_SOURCE_FILE': _replace_folder_path(image_path, input_folder, '/input'),
'PLOTCLIP_PLOTGEOMETRY_FILE': _replace_folder_path(plot_geometries, input_folder, '/input'),
'PLOTCLIP_WORKING_FOLDER': '/output',
'PLOTCLIP_OPTIONS': options if options is not None else '',
}
json_file_path = os.path.join(working_folder, 'args.json')
_write_command_json(json_file_path, json_args)
logging.debug("Command JSON: %s", str(json_args))
# Run the command
ret_value = _run_command('plotclip', input_folder, working_folder, json_file_path, msg_func, err_func)
command_results = None
if ret_value == 0:
command_results = _get_results_json(working_folder, err_func)
command_results['file_name'] = os.path.basename(image_path)
command_results['top_path'] = working_folder
return command_results
def handle_find_files2json(parameters: tuple, input_folder: str, working_folder: str, msg_func: Callable, err_func: Callable) -> \
Optional[dict]:
"""Handle running the file finding algorithm
Arguments:
parameters: the specified parameters for the algorithm
input_folder: the base folder where input files are located
working_folder: the working folder for the algorithm
msg_func: function to write messages to
err_func: function to write errors to
Return:
A dictionary of addittional parameters to pass to the next command or None
"""
search_name, search_folder = _find_parameter_values(parameters, ('file_name', 'top_path'))
# Ensure we have our mandatory parameters
_handle_missing_parameters('find_files2json', (search_name, search_folder), ('file_name','top_path'))
_handle_missing_folders('find_files2json', (search_folder,), ('top_path'))
# Write the arguments
json_args = {
'FILES2JSON_SEARCH_NAME': search_name,
'FILES2JSON_SEARCH_FOLDER': _replace_folder_path(search_folder, input_folder, '/input'),
'FILES2JSON_JSON_FILE': '/output/found_files.json',
}
json_file_path = os.path.join(working_folder, 'args.json')
_write_command_json(json_file_path, json_args)
logging.debug("Command JSON: %s", str(json_args))
# Run the command
ret_value = _run_command('find_files2json', input_folder, working_folder, json_file_path, msg_func, err_func)
command_results = None
if ret_value == 0:
command_results = _get_results_json(working_folder, err_func)
command_results['found_json_file'] = _replace_folder_path(json_args['FILES2JSON_JSON_FILE'], '/output', working_folder)
command_results['results_search_folder'] = json_args['FILES2JSON_SEARCH_FOLDER']
return command_results
def handle_canopycover(parameters: tuple, input_folder: str, working_folder: str, msg_func: Callable, err_func: Callable) -> Optional[dict]:
"""Handle running the canopy cover algorithm
Arguments:
parameters: the specified parameters for the algorithm
input_folder: the base folder where input files are located
working_folder: the working folder for the algorithm
msg_func: function to write messages to
err_func: function to write errors to
Return:
A dictionary of addittional parameters to pass to the next command or None
"""
json_filename, experiment_file, search_folder, options = _find_parameter_values(parameters,
('found_json_file', 'experimentdata', 'results_search_folder', 'options'))
# Ensure we have our mandatory parameters
_handle_missing_parameters('canopycover', (json_filename,), ('found_json_file',))
_handle_missing_files('canopycover', (json_filename,), ('found_json_file',))
# Adjust the found files JSON to point to our output folder - making a best effort if search_folder is None
new_json_filename = _repoint_files_json_dir(json_filename, search_folder, '/output', working_folder)
if new_json_filename is None:
new_json_filename = json_filename
# Default our options
if options is None:
options = ''
# Add in additional options
if experiment_file is not None:
if os.path.isfile(experiment_file):
options += ' --metadata ' + _replace_folder_path(experiment_file, input_folder, '/input')
else:
msg = 'Warning: invalid experiment file specified for canopy cover "%s"' % experiment_file
logging.warning(msg)
msg_func((msg,), True)
# Write the arguments
json_args = {
'CANOPYCOVER_OPTIONS': options if options is not None else '',
}
json_file_path = os.path.join(working_folder, 'args.json')
_write_command_json(json_file_path, json_args)
logging.debug("Command JSON: %s", str(json_args))
# Run the command
ret_value = _run_command('canopycover', input_folder, working_folder, json_file_path, msg_func, err_func,
[[new_json_filename,'/scif/apps/src/canopy_cover_files.json']])
command_results = None
if ret_value == 0:
command_results = {'results': _get_results_json(input_folder, err_func, True)}
command_results['top_path'] = working_folder
# TODO: change top_path to prev_working_folder everywhere and make that a default addition for substitution (magic value)
return command_results
def handle_greenness_indices(parameters: tuple, input_folder: str, working_folder: str, msg_func: Callable, err_func: Callable) -> \
Optional[dict]:
"""Handle running the greenness algorithm
Arguments:
parameters: the specified parameters for the algorithm
input_folder: the base folder where input files are located
working_folder: the working folder for the algorithm
msg_func: function to write messages to
err_func: function to write errors to
Return:
A dictionary of addittional parameters to pass to the next command or None
"""
json_filename, experiment_file, search_folder, options = _find_parameter_values(parameters,
('found_json_file', 'experimentdata', 'results_search_folder', 'options'))
# Ensure we have our mandatory parameters
_handle_missing_parameters('greenness indices', (json_filename,), ('found_json_file',))
_handle_missing_files('greenness indices', (json_filename,), ('found_json_file',))
# Adjust the found files JSON to point to our output folder - making a best effort if search_folder is None
new_json_filename = _repoint_files_json_dir(json_filename, search_folder, '/output', working_folder)
if new_json_filename is None:
new_json_filename = json_filename
# Default our options
if options is None:
options = ''
# Add in additional options
if experiment_file is not None:
if os.path.isfile(experiment_file):
options += ' --metadata ' + _replace_folder_path(experiment_file, input_folder, '/input')
else:
msg = 'Warning: invalid experiment file specified for greenness indices "%s"' % experiment_file
logging.warning(msg)
msg_func((msg,), True)
# Write the arguments
json_args = {
'GREENNESS_INDICES_OPTIONS': options if options is not None else '',
}
json_file_path = os.path.join(working_folder, 'args.json')
_write_command_json(json_file_path, json_args)
logging.debug("Command JSON: %s", str(json_args))
# Run the command
ret_value = _run_command('greenness-indices', input_folder, working_folder, json_file_path, msg_func, err_func,
[[new_json_filename,'/scif/apps/src/greenness-indices_files.json']])
command_results = None
if ret_value == 0:
command_results = {'results': _get_results_json(input_folder, err_func, True)}
command_results['top_path'] = working_folder
return command_results
def handle_merge_csv(parameters: tuple, input_folder: str, working_folder: str, msg_func: Callable, err_func: Callable) -> Optional[dict]:
"""Handle running the merging csv files algorithm
Arguments:
parameters: the specified parameters for the algorithm
input_folder: the base folder where input files are located
working_folder: the working folder for the algorithm
msg_func: function to write messages to
err_func: function to write errors to
Return:
A dictionary of addittional parameters to pass to the next command or None
"""
search_folder, options = _find_parameter_values(parameters, ('top_path', 'options'))
# Ensure we have our mandatory parameters
_handle_missing_parameters('merge_csv', (search_folder,), ('top_path',))
_handle_missing_folders('merge_csv', (search_folder,), ('top_path',))
# Write the arguments
json_args = {
'MERGECSV_SOURCE': _replace_folder_path(search_folder, input_folder, '/input'),
'MERGECSV_TARGET': '/output',
'MERGECSV_OPTIONS': options if options is not None else '',
}
json_file_path = os.path.join(working_folder, 'args.json')
_write_command_json(json_file_path, json_args)
logging.debug("Command JSON: %s", str(json_args))
# Run the command
ret_value = _run_command('merge_csv', input_folder, working_folder, json_file_path, msg_func, err_func)
command_results = None
if ret_value == 0:
command_results = _get_results_json(working_folder, err_func)
return command_results
def handle_git_repo(git_repo: str, git_branch: str, parameters: tuple, input_folder: str, working_folder: str, msg_func: Callable, \
err_func: Callable) -> Optional[dict]:
"""Handle running a git-based algorithm
Arguments:
git_repo: the URL of the repository to use
git_branch: the source branch to use
parameters: the specified parameters for the algorithm
input_folder: the base folder where input files are located
working_folder: the working folder for the algorithm
msg_func: function to write messages to
err_func: function to write errors to
Return:
A dictionary of addittional parameters to pass to the next command or None
"""
json_filename, experiment_file, search_folder, options = _find_parameter_values(parameters,
('found_json_file', 'experimentdata', 'results_search_folder', 'options'))
# Ensure we have our mandatory parameters
process_name = 'git repo ' + os.path.basename(git_repo)
_handle_missing_parameters(process_name, (json_filename,), ('found_json_file',))
_handle_missing_files(process_name, (json_filename,), ('found_json_file',))
# Adjust the found files JSON to point to our output folder - making a best effort if search_folder is None
new_json_filename = _repoint_files_json_dir(json_filename, search_folder, '/output', working_folder)
if new_json_filename is None:
new_json_filename = json_filename
# Default our options
if options is None:
options = ''
# Add in additional options
if experiment_file is not None:
if os.path.isfile(experiment_file):
options += ' --metadata ' + _replace_folder_path(experiment_file, input_folder, '/input')
else:
msg = 'Warning: invalid experiment file specified for %s:%s "%s"' % (git_repo, git_branch, experiment_file)
logging.warning(msg)
msg_func((msg,), True)
# Write the arguments
json_args = {
'GIT_RGB_PLOT_REPO': git_repo,
'GIT_RGB_PLOT_BRANCH': git_branch,
'GIT_RGB_PLOT_OPTIONS': options if options is not None else '',
}
json_file_path = os.path.join(working_folder, 'args.json')
_write_command_json(json_file_path, json_args)
logging.debug("Command JSON: %s", str(json_args))
# Run the command
ret_value = _run_command('git_rgb_plot', input_folder, working_folder, json_file_path, msg_func, err_func,
[[new_json_filename, '/scif/apps/src/git_rgb_plot_files.json']])
command_results = None
if ret_value == 0:
command_results = {'results': _get_results_json(input_folder, err_func, True)}
command_results['top_path'] = working_folder
return command_results