Skip to content

Commit

Permalink
Add ultrasound confidence map to transforms
Browse files Browse the repository at this point in the history
  • Loading branch information
MrGranddy committed Jul 12, 2023
1 parent 17c1e3a commit 11a1399
Showing 1 changed file with 309 additions and 1 deletion.
310 changes: 309 additions & 1 deletion monai/transforms/intensity/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from abc import abstractmethod
from collections.abc import Callable, Iterable, Sequence
from functools import partial
from typing import Any
from typing import Any, Tuple, Literal
from warnings import warn

import numpy as np
Expand All @@ -37,6 +37,10 @@
from monai.utils.type_conversion import convert_data_type, convert_to_dst_type, convert_to_tensor, get_equivalent_dtype

skimage, _ = optional_import("skimage", "0.19.0", min_version)
cv2, _ = optional_import("cv2")
Oct2Py, _ = optional_import("oct2py", "5.6.0", min_version, "Oct2Py")
csc_matrix, _ = optional_import("scipy.sparse", "1.7.1", min_version, "csc_matrix")
hilbert, _ = optional_import("scipy.signal", "1.7.1", min_version, "hilbert")

__all__ = [
"RandGaussianNoise",
Expand Down Expand Up @@ -77,6 +81,7 @@
"RandIntensityRemap",
"ForegroundMask",
"ComputeHoVerMaps",
"UltrasoundConfidenceMap",
]


Expand Down Expand Up @@ -2577,3 +2582,306 @@ def __call__(self, mask: NdarrayOrTensor):

hv_maps = convert_to_tensor(np.concatenate([h_map, v_map]), track_meta=get_track_meta())
return hv_maps

class UltrasoundConfidenceMap(Transform):
"""Compute confidence map from an ultrasound image.
This transform uses the method introduced by Karamalis et al. in https://doi.org/10.1016/j.media.2012.07.005.
It generates a confidence map by setting source and sink points in the image and computing the probability
for random walks to reach the source for each pixel.
Args:
alpha (float, optional): Alpha parameter. Defaults to 2.0.
beta (float, optional): Beta parameter. Defaults to 90.0.
gamma (float, optional): Gamma parameter. Defaults to 0.05.
mode (str, optional): 'RF' or 'B' mode data. Defaults to 'B'.
"""

def __init__(
self,
alpha: float = 2.0,
beta: float = 90.0,
gamma: float = 0.05,
mode: Literal["RF", "B"] = "B",
):

self.alpha = alpha
self.beta = beta
self.gamma = gamma
self.mode = mode

# The precision to use for all computations
self.eps = np.finfo("float64").eps

# Octave instance for computing the confidence map
self.oc = Oct2Py()

def sub2ind(self, size: Tuple[int], rows: np.ndarray, cols: np.ndarray) -> np.ndarray:
"""Converts row and column subscripts into linear indices,
basically the copy of the MATLAB function of the same name.
https://www.mathworks.com/help/matlab/ref/sub2ind.html
This function is Pythonic so the indices start at 0.
Args:
size Tuple[int]: Size of the matrix
rows (np.ndarray): Row indices
cols (np.ndarray): Column indices
Returns:
indices (np.ndarray): 1-D array of linear indices
"""
indices = rows + cols * size[0]
return indices

def get_seed_and_labels(self, data : np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""Get the seed and label arrays for the max-flow algorithm
Args:
data: Input array
Returns:
Tuple[np.ndarray, np.ndarray]: Seed and label arrays
"""

# Seeds and labels (boundary conditions)
seeds = np.array([], dtype="float64")
labels = np.array([], dtype="float64")

# Indices for all columns
sc = np.arange(data.shape[1], dtype="float64")

# SOURCE ELEMENTS - 1st matrix row
# Indices for 1st row, it will be broadcasted with sc
sr_up = np.array([0])
seed = self.sub2ind(data.shape, sr_up, sc).astype("float64")
seed = np.unique(seed)
seeds = np.concatenate((seeds, seed))

# Label 1
label = np.ones_like(seed)
labels = np.concatenate((labels, label))

# SINK ELEMENTS - last image row
sr_down = np.ones_like(sc) * (data.shape[0] - 1)
seed = self.sub2ind(data.shape, sr_down, sc).astype("float64")

seed = np.unique(seed)
seeds = np.concatenate((seeds, seed))

# Label 2
label = np.ones_like(seed) * 2
labels = np.concatenate((labels, label))

return seeds, labels

def normalize(self, inp: np.ndarray) -> np.ndarray:
"""Normalize an array to [0, 1]"""
return (inp - np.min(inp)) / (np.ptp(inp) + self.eps)

def attenuation_weighting(self, A: np.ndarray, alpha: float) -> np.ndarray:
"""Compute attenuation weighting
Args:
A (np.ndarray): Image
alpha: Attenuation coefficient (see publication)
Returns:
W (np.ndarray): Weighting expressing depth-dependent attenuation
"""

# Create depth vector and repeat it for each column
Dw = np.linspace(0, 1, A.shape[0], dtype="float64")
Dw = np.tile(Dw.reshape(-1, 1), (1, A.shape[1]))

W = 1.0 - np.exp(-alpha * Dw) # Compute exp inline

return W

def confidence_laplacian(
self, P: np.ndarray, A: np.ndarray, beta: float, gamma: float
) -> csc_matrix: # type: ignore
"""Compute 6-Connected Laplacian for confidence estimation problem
Args:
P (np.ndarray): The index matrix of the image with boundary padding.
A (np.ndarray): The padded image.
beta (float): Random walks parameter that defines the sensitivity of the Gaussian weighting function.
gamma (float): Horizontal penalty factor that adjusts the weight of horizontal edges in the Laplacian.
Returns:
L (csc_matrix): The 6-connected Laplacian matrix used for confidence map estimation.
"""

m, _ = P.shape

P = P.T.flatten()
A = A.T.flatten()

p = np.where(P > 0)[0]

i = P[p] - 1 # Index vector
j = P[p] - 1 # Index vector
# Entries vector, initially for diagonal
s = np.zeros_like(p, dtype="float64")

vl = 0 # Vertical edges length

edge_templates = [
-1, # Vertical edges
1,
m - 1, # Diagonal edges
m + 1,
-m - 1,
-m + 1,
m, # Horizontal edges
-m,
]

vertical_end = None
diagonal_end = None

for iter_idx, k in enumerate(edge_templates):

Q = P[p + k]

q = np.where(Q > 0)[0]

ii = P[p[q]] - 1
i = np.concatenate((i, ii))
jj = Q[q] - 1
j = np.concatenate((j, jj))
W = np.abs(A[p[ii]] - A[p[jj]]) # Intensity derived weight
s = np.concatenate((s, W))

if iter_idx == 1:
vertical_end = s.shape[0] # Vertical edges length
elif iter_idx == 5:
diagonal_end = s.shape[0] # Diagonal edges length

# Normalize weights
s = self.normalize(s)

# Horizontal penalty
s[:vertical_end] += gamma
#s[vertical_end:diagonal_end] += gamma * np.sqrt(2) # --> In the paper it is sqrt(2) since the diagonal edges are longer yet does not exist in the original code

# Normalize differences
s = self.normalize(s)

# Gaussian weighting function
s = -(
(np.exp(-beta * s, dtype="float64")) + 1.0e-6
) # --> This epsilon changes results drastically default: 1.e-6

# Create Laplacian, diagonal missing
L = csc_matrix((s, (i, j)))

# Reset diagonal weights to zero for summing
# up the weighted edge degree in the next step
L.setdiag(0)

# Weighted edge degree
D = np.abs(L.sum(axis=0).A)[0]

# Finalize Laplacian by completing the diagonal
L.setdiag(D)

return L

def confidence_estimation(self, A, seeds, labels, beta, gamma):
"""Compute confidence map
Args:
A (np.ndarray): Processed image.
seeds (np.ndarray): Seeds for the random walks framework. These are indices of the source and sink nodes.
labels (np.ndarray): Labels for the random walks framework. These represent the classes or groups of the seeds.
beta: Random walks parameter that defines the sensitivity of the Gaussian weighting function.
gamma: Horizontal penalty factor that adjusts the weight of horizontal edges in the Laplacian.
Returns:
map: Confidence map which shows the probability of each pixel belonging to the source or sink group.
"""

# Index matrix with boundary padding
G = np.arange(1, A.shape[0] * A.shape[1] + 1).reshape(A.shape[1], A.shape[0]).T
pad = 1

G = np.pad(G, (pad, pad), "constant", constant_values=(0, 0))
B = np.pad(A, (pad, pad), "constant", constant_values=(0, 0))

# Laplacian
D = self.confidence_laplacian(G, B, beta, gamma)

# Select marked columns from Laplacian to create L_M and B^T
B = D[:, seeds]

# Select marked nodes to create B^T
N = np.sum(G > 0).item()
i_U = np.setdiff1d(np.arange(N), seeds.astype(int)) # Index of unmarked nodes
B = B[i_U, :]

# Remove marked nodes from Laplacian by deleting rows and cols
keep_indices = np.setdiff1d(np.arange(D.shape[0]), seeds)
D = csc_matrix(D[keep_indices, :][:, keep_indices])

# Define M matrix
M = np.zeros((seeds.shape[0], 1), dtype="float64")
M[:, 0] = labels == 1

# Right-handside (-B^T*M)
rhs = -B @ M # type: ignore

# Solve system exactly
x = self.oc.mldivide(D, rhs)[:, 0]

# Prepare output
probabilities = np.zeros((N,), dtype="float64")
# Probabilities for unmarked nodes
probabilities[i_U] = x
# Max probability for marked node
probabilities[seeds[labels == 1].astype(int)] = 1.0

# Final reshape with same size as input image (no padding)
probabilities = probabilities.reshape((A.shape[1], A.shape[0])).T

return probabilities

def __call__(self, data: np.ndarray, downsample=None) -> np.ndarray:
"""Compute the confidence map
Args:
data (np.ndarray): RF ultrasound data (one scanline per column)
Returns:
map (np.ndarray): Confidence map
"""

# Normalize data
data = data.astype("float64")
data = self.normalize(data)

if self.mode == "RF":
# MATLAB hilbert applies the Hilbert transform to columns
data = np.abs(hilbert(data, axis=0)).astype("float64") # type: ignore

org_H, org_W = data.shape
if downsample is not None:
data = cv2.resize(data, (org_W // downsample, org_H // downsample), interpolation=cv2.INTER_CUBIC)

seeds, labels = self.get_seed_and_labels(data)

# Attenuation with Beer-Lambert
W = self.attenuation_weighting(data, self.alpha)

# Apply weighting directly to image
# Same as applying it individually during the formation of the
# Laplacian
data = data * W

# Find condidence values
map_ = self.confidence_estimation(data, seeds, labels, self.beta, self.gamma)

if downsample is not None:
map_ = cv2.resize(map_, (org_W, org_H), interpolation=cv2.INTER_CUBIC)

return map_

0 comments on commit 11a1399

Please sign in to comment.