-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathlocal.py
212 lines (185 loc) · 7.49 KB
/
local.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import os
import imageio
import matplotlib.pyplot as plt
from matplotlib import gridspec, ticker
import numpy as np
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision import transforms
from mpl_toolkits.axes_grid1 import make_axes_locatable
from skimage.segmentation import relabel_sequential
from scipy.optimize import linear_sum_assignment
def show_one_image(image_path):
image = imageio.imread(image_path)
plt.imshow(image)
class NucleiDataset(Dataset):
"""A PyTorch dataset to load cell images and nuclei masks"""
def __init__(self, root_dir, transform=None, img_transform=None):
self.root_dir = (
"./" + root_dir
) # the directory with all the training samples
self.samples = os.listdir(self.root_dir) # list the samples
self.transform = (
transform # transformations to apply to both inputs and targets
)
self.img_transform = img_transform # transformations to apply to raw image only
# transformations to apply just to inputs
inp_transforms = transforms.Compose(
[
transforms.Grayscale(),
transforms.ToTensor(),
transforms.Normalize([0.5], [0.5]), # 0.5 = mean and 0.5 = variance
]
)
self.loaded_imgs = [None] * len(self.samples)
self.loaded_masks = [None] * len(self.samples)
for sample_ind in range(len(self.samples)):
img_path = os.path.join(
self.root_dir, self.samples[sample_ind], "image.tif"
)
image = Image.open(img_path)
image.load()
self.loaded_imgs[sample_ind] = inp_transforms(image)
mask_path = os.path.join(
self.root_dir, self.samples[sample_ind], "mask.tif"
)
mask = Image.open(mask_path)
mask.load()
self.loaded_masks[sample_ind] = transforms.ToTensor()(mask)
# get the total number of samples
def __len__(self):
return len(self.samples)
# fetch the training sample given its index
def __getitem__(self, idx):
# we'll be using Pillow library for reading files
# since many torchvision transforms operate on PIL images
image = self.loaded_imgs[idx]
mask = self.loaded_masks[idx]
if self.transform is not None:
# Note: using seeds to ensure the same random transform is applied to
# the image and mask
seed = torch.seed()
torch.manual_seed(seed)
image = self.transform(image)
torch.manual_seed(seed)
mask = self.transform(mask)
if self.img_transform is not None:
image = self.img_transform(image)
return image, mask
def show_random_dataset_image(dataset):
idx = np.random.randint(0, len(dataset)) # take a random sample
img, mask = dataset[idx] # get the image and the nuclei masks
f, axarr = plt.subplots(1, 2) # make two plots on one figure
axarr[0].imshow(img[0]) # show the image
axarr[0].set_title("Image")
axarr[1].imshow(mask[0], interpolation=None) # show the masks
axarr[1].set_title("Mask")
_ = [ax.axis("off") for ax in axarr] # remove the axes
print("Image size is %s" % {img[0].shape})
plt.show()
def show_random_dataset_image_with_prediction(dataset, model, device="cpu"):
idx = np.random.randint(0, len(dataset)) # take a random sample
img, mask = dataset[idx] # get the image and the nuclei masks
x = img.to(device).unsqueeze(0)
y = model(x)[0].detach().cpu().numpy()
print("MSE loss:", np.mean((mask[0].numpy() - y[0]) ** 2))
f, axarr = plt.subplots(1, 3) # make two plots on one figure
axarr[0].imshow(img[0]) # show the image
axarr[0].set_title("Image")
axarr[1].imshow(mask[0], interpolation=None) # show the masks
axarr[1].set_title("Mask")
axarr[2].imshow(y[0], interpolation=None) # show the prediction
axarr[2].set_title("Prediction")
_ = [ax.axis("off") for ax in axarr] # remove the axes
print("Image size is %s" % {img[0].shape})
plt.show()
def show_random_augmentation_comparison(dataset_a, dataset_b):
assert len(dataset_a) == len(dataset_b)
idx = np.random.randint(0, len(dataset_a)) # take a random sample
img_a, mask_a = dataset_a[idx] # get the image and the nuclei masks
img_b, mask_b = dataset_b[idx] # get the image and the nuclei masks
f, axarr = plt.subplots(2, 2) # make two plots on one figure
axarr[0, 0].imshow(img_a[0]) # show the image
axarr[0, 0].set_title("Image")
axarr[0, 1].imshow(mask_a[0], interpolation=None) # show the masks
axarr[0, 1].set_title("Mask")
axarr[1, 0].imshow(img_b[0]) # show the image
axarr[1, 0].set_title("Augmented Image")
axarr[1, 1].imshow(mask_b[0], interpolation=None) # show the prediction
axarr[1, 1].set_title("Augmented Mask")
_ = [ax.axis("off") for ax in axarr.flatten()] # remove the axes
plt.show()
def train(
model,
loader,
optimizer,
loss_function,
epoch,
log_interval=100,
log_image_interval=20,
tb_logger=None,
device=None,
early_stop=False,
):
if device is None:
# You can pass in a device or we will default to using
# the gpu. Feel free to try training on the cpu to see
# what sort of performance difference there is
if torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
# set the model to train mode
model.train()
# move model to device
model = model.to(device)
# iterate over the batches of this epoch
for batch_id, (x, y) in enumerate(loader):
# move input and target to the active device (either cpu or gpu)
x, y = x.to(device), y.to(device)
# zero the gradients for this iteration
optimizer.zero_grad()
# apply model and calculate loss
prediction = model(x)
if prediction.shape != y.shape:
y = crop(y, prediction)
if y.dtype != prediction.dtype:
y = y.type(prediction.dtype)
loss = loss_function(prediction, y)
# backpropagate the loss and adjust the parameters
loss.backward()
optimizer.step()
# log to console
if batch_id % log_interval == 0:
print(
"Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
epoch,
batch_id * len(x),
len(loader.dataset),
100.0 * batch_id / len(loader),
loss.item(),
)
)
# log to tensorboard
if tb_logger is not None:
step = epoch * len(loader) + batch_id
tb_logger.add_scalar(
tag="train_loss", scalar_value=loss.item(), global_step=step
)
# check if we log images in this iteration
if step % log_image_interval == 0:
tb_logger.add_images(
tag="input", img_tensor=x.to("cpu"), global_step=step
)
tb_logger.add_images(
tag="target", img_tensor=y.to("cpu"), global_step=step
)
tb_logger.add_images(
tag="prediction",
img_tensor=prediction.to("cpu").detach(),
global_step=step,
)
if early_stop and batch_id > 5:
print("Stopping test early!")
break