-
Notifications
You must be signed in to change notification settings - Fork 1
/
gen_inroom_rtss.py
executable file
·301 lines (253 loc) · 13.3 KB
/
gen_inroom_rtss.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
#!/usr/bin/env python
import glob
import logging
import sys
from datetime import datetime
from os import path as os_path
from pathlib import Path
from typing import Dict, List
import numpy as np
from pydicom import Dataset, Sequence, read_file, uid, write_file
# Copied and modified from ImageLoading.py from OnkoDICOM, which was LGPL 2.1 at the time
def img_stack_displacement(orientation: List[str], position: List[str]) -> float:
"""
Calculate the projection of the image position patient along the
axis perpendicular to the images themselves, i.e. along the stack
axis. Intended use is for the sorting key to sort a stack of image
datasets so that they are in order, regardless of whether the images
are axial, coronal, or sagittal, and independent from the order in
which the images were read in.
:param orientation: List of strings with six elements, the image
orientation patient value from the dataset.
:param position: List of strings with three elements, the image
position value from the dataset.
:return: Float of the image position patient along the image stack
axis.
"""
ds_orient_x = orientation[0:3]
ds_orient_y = orientation[3:6]
orient_x = np.array(list(map(float, ds_orient_x)))
orient_y = np.array(list(map(float, ds_orient_y)))
orient_z = np.cross(orient_x, orient_y)
img_pos_patient = np.array(list(map(float, position)))
displacement = orient_z.dot(img_pos_patient)
return displacement
def get_dict_sort_on_displacement(item: tuple[str, Dataset]) -> float:
"""
:param item: dictionary key, value item with value of a PyDicom
dataset
:return: Float of the projection of the image position patient on
the axis through the image stack
"""
img_dataset = item[1]
orientation = img_dataset.ImageOrientationPatient
position = img_dataset.ImagePositionPatient
sort_key = img_stack_displacement(orientation, position)
return sort_key
def image_stack_sort(read_data_dict: Dict[str, Dataset]) -> List[tuple[str, Dataset]]:
"""
Sort the read_data_dict by order of displacement
along the image stack axis. For axial images this is by the Z
coordinate.
:return: Tuple of sorted dictionaries
"""
new_items = read_data_dict.items()
sorted_dict_on_displacement = sorted(new_items, key=get_dict_sort_on_displacement, reverse=True)
return sorted_dict_on_displacement
def get_stack_center(sorted_dict_on_displacement: List[tuple[str, Dataset]]) -> List[float]:
first_ds = sorted_dict_on_displacement[0][1]
last_ds = sorted_dict_on_displacement[len(sorted_dict_on_displacement) - 1][1]
first_image_pos = np.array(list(map(float, first_ds.ImagePositionPatient)))
logging.debug(f"First image position: {first_image_pos}")
last_image_pos = np.array(list(map(float, last_ds.ImagePositionPatient)))
logging.debug(f"Last Image Position: {last_image_pos}")
row_spacing = last_ds.PixelSpacing[0]
logging.debug(f"Row spacing: {row_spacing}")
column_spacing = last_ds.PixelSpacing[1] # see https://dicom.innolitics.com/ciods/ct-image/image-plane/00280030
logging.debug(f"Column spacing: {column_spacing}")
rows = last_ds.Rows
logging.debug(f"Number of Rows: {rows}")
cols = last_ds.Columns
logging.debug(f"Number of Columns: {cols}")
# image_width = last_ds.Columns * column_spacing
# image_height = last_ds.Rows * row_spacing
logging.debug(f"Patient Position (with respect to gravity and the Gantry): {last_ds.PatientPosition}")
orientation = last_ds.ImageOrientationPatient
logging.debug(f"Image Orientation Patient: {orientation}")
ds_orient_x = orientation[0:3]
ds_orient_y = orientation[3:6]
orient_x = np.array(list(map(float, ds_orient_x)))
orient_y = np.array(list(map(float, ds_orient_y)))
column_last_pixel_displacement = (cols - 1) * orient_x * column_spacing
row_last_pixel_displacement = (rows - 1) * orient_y * row_spacing
last_image_last_pixel_pos = last_image_pos + column_last_pixel_displacement + row_last_pixel_displacement
image_stack_isocenter_pos = 0.5 * (last_image_last_pixel_pos + first_image_pos)
return image_stack_isocenter_pos.tolist()
def load_ct_headers_from_directory(ct_directory: Path) -> Dict[Path, Dataset]:
files = list_files(ct_directory, "dcm")
ds_dict = {}
for file in files:
ds = read_file(file, force=True, stop_before_pixels=True)
if ds.SOPClassUID == uid.CTImageStorage:
ds_dict[file] = ds
return ds_dict
def list_files(filepath: Path, filetype: str) -> List[Path]:
paths = []
str_glob = f"*.{filetype}"
for name in glob.glob(os_path.join(str(filepath), str_glob)):
paths.append(name)
return paths
def get_stack_center_from_path(ct_directory: Path) -> List[float]:
dict_of_ct_headers = load_ct_headers_from_directory(ct_directory)
sorted_stack = image_stack_sort(dict_of_ct_headers)
ct_stack_center = get_stack_center(sorted_stack)
logging.debug(f"CT volume with {len(sorted_stack)} slices in {ct_directory} is centered at {ct_stack_center}")
return ct_stack_center
def usage():
print(f"{sys.argv[0]} ct_directory rt_ion_plan_file_path ref_rtss_file_path")
print("The ct_directory is used to find the CBCT isocenter and to provide patient and study information")
print("The RT Ion Plan is used to identify the (original) Frame of Reference UID and to validate the referenced RT SS UID")
print("The Referenced RT Structure Set is used to identify the Series and SOP Instance UIDs of the reference CT")
def pre_populate_inroom_rtss_header(ct_ds: Dataset, inroom_rtss_ds: Dataset = None) -> Dataset:
if inroom_rtss_ds is None:
prepopulated_rtss_ds = Dataset()
else:
prepopulated_rtss_ds = inroom_rtss_ds
for key_word in [
"StudyDate",
"StudyTime",
"AccessionNumber",
"Manufacturer",
"InstitutionName",
"InstitutionAddress",
"ReferringPhysiciansName",
"OperatorsName",
"PatientID",
"PatientsBirthDate",
"PatientsSex",
"StudyInstanceUID",
"StudyID",
]:
try:
prepopulated_rtss_ds[key_word] = ct_ds[key_word]
except (KeyError, IndexError):
print(f"{key_word} not found in CT")
return prepopulated_rtss_ds
def populate_ifsseq0099_rtss(sorted_stack, ct_stack_center, inroom_rtss_ds):
now = datetime.now()
first_ct_ds = sorted_stack[0][1]
inroom_rtss_ds.StructureSetROISequence = Sequence()
inroom_rtss_ds.ROIContourSequence = Sequence()
inroom_rtss_ds.RTROIObservationsSequence = Sequence()
inroom_rtss_ds.ReferencedFrameOfReferenceSequence = Sequence()
inroom_rtss_ds.InstanceCreationDate = now.strftime("%Y%m%d")
inroom_rtss_ds.InstanceCreationTime = now.strftime("%H%M%S")
inroom_rtss_ds.SOPClassUID = uid.RTStructureSetStorage
inroom_rtss_ds.SOPInstanceUID = uid.generate_uid() # could potentially use org root of CT as prefix
inroom_rtss_ds.SeriesInstanceUID = uid.generate_uid()
inroom_rtss_ds.Modality = "RTSTRUCT"
inroom_rtss_ds.StructureSetLabel = "InRoom Isocenter"
inroom_rtss_ds.StructureSetName = "RTSS for Setup CBCT"
inroom_rtss_ds.StructureSetDescription = "IFSSEQ0099 compliant RT SS for positioning CT"
inroom_rtss_ds.StructureSetDate = now.strftime("%Y%m%d")
inroom_rtss_ds.StructureSetTime = now.strftime("%H%M%S")
ref_frame_reference_sequence_item = Dataset()
ref_frame_reference_sequence_item.FrameOfReferenceUID = first_ct_ds.FrameOfReferenceUID
ref_frame_reference_sequence_item.RTReferencedStudySequence = Sequence()
ref_study_sequence_item = Dataset()
ref_study_sequence_item.ReferencedSOPClassUID = first_ct_ds.SOPClassUID
ref_study_sequence_item.ReferencedSOPInstanceUID = first_ct_ds.StudyInstanceUID
ref_study_sequence_item.ReferencedSeriesSequence = Sequence()
ref_series_sequence_item = Dataset()
ref_series_sequence_item.SeriesInstanceUID = first_ct_ds.SeriesInstanceUID
ref_series_sequence_item.ContourImageSequence = Sequence()
for ct_tuple in sorted_stack:
ct_ds = ct_tuple[1]
contour_sequence_item = Dataset()
contour_sequence_item.ReferencedSOPClassUID = ct_ds.SOPClassUID
contour_sequence_item.ReferencedSOPInstanceUID = ct_ds.SOPInstanceUID
ref_series_sequence_item.ContourImageSequence.append(contour_sequence_item)
ref_study_sequence_item.ReferencedSeriesSequence.append(ref_series_sequence_item)
ref_frame_reference_sequence_item.RTReferencedStudySequence.append(ref_study_sequence_item)
inroom_rtss_ds.ReferencedFrameOfReferenceSequence.append(ref_frame_reference_sequence_item)
ss_roi_sequence_item = Dataset()
ss_roi_sequence_item.ROINumber = 1
ss_roi_sequence_item.ReferencedFrameOfReferenceUID = first_ct_ds.FrameOfReferenceUID
ss_roi_sequence_item.ROIName = "InitMatchIso" # See IFSSEQ0099
ss_roi_sequence_item.ROIDescription = "Isocenter of Treatment Machine"
ss_roi_sequence_item.ROIGenerationAlgorithm = "AUTOMATIC"
ss_roi_sequence_item.ROIGenerationDescription = "Extracted from Center of CBCT Image Volume"
inroom_rtss_ds.StructureSetROISequence.append(ss_roi_sequence_item)
ss_roi_sequence_item = Dataset()
ss_roi_sequence_item.ROINumber = 2
ss_roi_sequence_item.ReferencedFrameOfReferenceUID = first_ct_ds.FrameOfReferenceUID
ss_roi_sequence_item.ROIName = "SetupIsocenter" # See IFSSEQ0099
ss_roi_sequence_item.ROIDescription = "Isocenter of Treatment Machine"
ss_roi_sequence_item.ROIGenerationAlgorithm = "AUTOMATIC"
ss_roi_sequence_item.ROIGenerationDescription = "Extracted from Center of CBCT Image Volume"
inroom_rtss_ds.StructureSetROISequence.append(ss_roi_sequence_item)
roi_contour_sequence_item = Dataset()
roi_contour_sequence_item.ReferencedROINumber = 1
roi_contour_sequence_item.ContourSequence = Sequence()
contour_sequence_item = Dataset()
contour_sequence_item.ContourNumber = 1
contour_sequence_item.ContourGeometricType = "POINT"
contour_sequence_item.NumberOfContourPoints = 1
contour_sequence_item.ContourData = ct_stack_center
roi_contour_sequence_item.ContourSequence.append(contour_sequence_item)
inroom_rtss_ds.ROIContourSequence.append(roi_contour_sequence_item)
roi_contour_sequence_item = Dataset()
roi_contour_sequence_item.ReferencedROINumber = 2
roi_contour_sequence_item.ContourSequence = Sequence()
contour_sequence_item = Dataset()
contour_sequence_item.ContourNumber = 1
contour_sequence_item.ContourGeometricType = "POINT"
contour_sequence_item.NumberOfContourPoints = 1
contour_sequence_item.ContourData = ct_stack_center
roi_contour_sequence_item.ContourSequence.append(contour_sequence_item)
inroom_rtss_ds.ROIContourSequence.append(roi_contour_sequence_item)
rt_roi_observations_sequence_item = Dataset()
rt_roi_observations_sequence_item.ObservationNumber = 1
rt_roi_observations_sequence_item.ReferencedROINumber = 1
rt_roi_observations_sequence_item.RTROIInterpretedType = "INITMATCHISO" # See IFSSEQ0099
rt_roi_observations_sequence_item.ROIInterpreter = "" # use None instead of "" ?
inroom_rtss_ds.RTROIObservationsSequence.append(rt_roi_observations_sequence_item)
rt_roi_observations_sequence_item = Dataset()
rt_roi_observations_sequence_item.ObservationNumber = 2
rt_roi_observations_sequence_item.ReferencedROINumber = 2
rt_roi_observations_sequence_item.RTROIInterpretedType = "SETUPISOCENTER" # See IFSSEQ0099
rt_roi_observations_sequence_item.ROIInterpreter = "" # use None instead of "" ?
inroom_rtss_ds.RTROIObservationsSequence.append(rt_roi_observations_sequence_item)
if __name__ == "__main__":
num_args = len(sys.argv)
if num_args < 2:
usage()
sys.exit("No arguments provided, must at least provide directory where in-room CT/CBCT data files are.")
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s|%(name)s|%(levelname)s|%(funcName)s|%(message)s")
ct_directory = Path(sys.argv[1]).expanduser()
if not ct_directory.exists():
sys.exit(f"Unable to find {ct_directory}")
# ct_stack_center = get_stack_center_from_path(ct_directory)
dict_of_ct_headers = load_ct_headers_from_directory(ct_directory)
sorted_stack = image_stack_sort(dict_of_ct_headers)
ct_stack_center = get_stack_center(sorted_stack)
print(ct_stack_center)
if num_args < 4:
usage()
sys.exit()
else:
ion_plan_ds = read_file(Path(sys.argv[2]).expanduser(), force=True)
ref_rt_ss = read_file(Path(sys.argv[3]).expanduser(), force=True)
plan_ref_rtss = str(ion_plan_ds.ReferencedStructureSetSequence[0].ReferencedSOPInstanceUID)
ref_rtss_uid = str(ref_rt_ss.SOPInstanceUID)
if plan_ref_rtss != ref_rtss_uid:
sys.exit(f"Referenced RT SS in plan: {plan_ref_rtss} doesn't match RT SS UID: {ref_rtss_uid}")
now = datetime.now()
# Pre-populate the inroom RT SS with data from the CT
# Patient and Study Information
first_ct_ds = sorted_stack[0][1]
inroom_rtss_ds = pre_populate_inroom_rtss_header(first_ct_ds)
populate_ifsseq0099_rtss(sorted_stack, ct_stack_center, inroom_rtss_ds)
inroom_rtss_ds.is_implicit_VR = True
inroom_rtss_ds.is_little_endian = True
write_file(f"RS_{inroom_rtss_ds.SOPInstanceUID}.dcm", inroom_rtss_ds)