-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess.py
114 lines (100 loc) · 3.99 KB
/
process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
"""
process.py
Author: Ian Smith
Description: This module should contain all operations related to calling/executing image processing algorithms.
"""
import constants, ip_utils
from job import JobData
import os
import subprocess
import traceback
class Processor:
"""
A class to handle the processing of images, uses a factory method to decide what type of job a job is based off of
the job's metadata
"""
def __init__(self, logger, file_manager):
"""
Constructor method
:param logger: Injected Logger from the ip_logger class
:param file_manager: A JobManager class instance
"""
self.logs = logger
self.file_manager = file_manager
self.current = None
self.process = None
self._perform_startup()
def _perform_startup(self):
"""
On startup, clears the destination directory of jobs
:return: None
"""
self.logs.log_debug("Retrieving files from destination dir")
files = ip_utils.get_abs_paths(constants.DEST)
for i in files:
self.file_manager.move(i, constants.BATCHES)
def process_image(self, job_base):
"""
Method to initialize the processing of an image
:param job_base: Path to the job
:return: None
"""
job_data = JobData(job_base)
self.current = job_data
try:
self._get_processor(job_data)
self.current = None
return True
except FileNotFoundError as e:
self.logs.log_error(f"FileNotFoundError: {e}")
return False
except NotImplementedError as e:
self.logs.log_error(f"NotImplementedError: {e}")
return False
except Exception as e:
self.logs.log_error("An error has occurred with {}: {}".format(job_data.base_name, e))
self.logs.log_error(traceback.format_exc())
return False
finally:
self.process = None
self.current = None
def _get_processor(self, job_data):
"""
Factory method to choose the processing method for selected job
:param job_data: JobData
:return: None
"""
job_type = job_data.data.get(constants.JOB_TYPE)
job_type = job_type.lower()
if job_type == "radius_tibia_final":
self._radius_tibia_final(job_data)
else:
raise NotImplementedError(f"Job Type: {job_type} not implemented in this system.")
def _radius_tibia_final(self, job_data):
"""
Method to execute the radius-tibia image segmentation, raises error on failed processing job
:param job_data: JobData instance
:return: None
"""
self.logs.log_debug("Processing {}".format(job_data.image_file_name))
# Checking that the path to the env to run the segmentation exists and that the path to segment.py exists
if not os.path.exists(constants.RAD_TIB_PATH_TO_ENV):
raise FileNotFoundError("Path to bl_torch python executable does not exist")
elif not os.path.exists(constants.RAD_TIB_PATH_TO_START):
raise FileNotFoundError("Path to HR-pQCT-Segmentation segment.py does not exist")
# The first item in the list is the path to the python interpreter with the conda env and the
# 2nd is the path to run the model
cmd = [constants.RAD_TIB_PATH_TO_ENV, constants.RAD_TIB_PATH_TO_START, job_data.base,
constants.RAD_TIB_TRAINED_MODELS, "--image-pattern", job_data.image_file_name.lower()]
self.process = subprocess.run(cmd)
if self.process.returncode == 0:
self.logs.log_debug("radius-tibia-final job {} finished successfully".format(job_data.base_name))
else:
raise subprocess.CalledProcessError(self.process.returncode, cmd)
def shutdown(self):
"""
Method to shut down the processing module
:return:
"""
if self.process is not None:
self.process.kill()