Source code for miblab.dlsegkidney

import os
import sys

import numpy as np

from miblab.data import zenodo_fetch
from miblab.data import clear_cache_datafiles


if sys.version_info < (3, 9):
    # importlib.resources either doesn't exist or lacks the files()
    # function, so use the PyPI version:
    import importlib_resources
else:
    # importlib.resources has files(), so use that:
    import importlib.resources as importlib_resources

try:
    from monai.networks.nets.unetr import UNETR
    from monai.inferers import sliding_window_inference
    monai_installed = True
except ImportError:
    monai_installed = False

try:
    import torch
    torch_installed = True
except ImportError:
    torch_installed = False

try:
    import nibabel as nib
    nib_installed = True
except ImportError:
    nib_installed = False

try:
    from nnunetv2.inference.predict_from_raw_data import nnUNetPredictor
    nnunetv2 = True
except ImportError:
    nnunetv2 = False

try:
    import scipy.ndimage as ndi
    scipy_installed = True
except ImportError:
    scipy_installed = False


[docs] def kidney_pc_dixon(input_array,model='unetr', overlap=0.3, postproc=True, clear_cache = False, verbose=False): """ Segment individual kidneys on post-contrast Dixon images. This requires 4-channel input data with out-phase images, in-phase images, water maps, and fat maps. Args: input_array (numpy.ndarray): A 4D numpy array of shape [x, y, z, contrast] representing the input medical image volume. The last index must contain out-phase, in-phase, water and fat images, in that order. model: model = 'unetr' uses a pretrained UNETR-based model in MONAI, hosted on `Zenodo <https://zenodo.org/records/15521814>`_ model = 'nnunet' uses a pretrained nnUNet-based model, hosted on `Zenodo <https://zenodo.org/records/15328218>`_ under the hood this runs nnUNetPredictor (for more details `MIC-DKFZ Wiki <https://deepwiki.com/MIC-DKFZ/nnUNet>`_) overlap (float): only valid for model = 'unetr' defines the amount of overlap between adjacent sliding window patches during inference. A higher value (e.g., 0.5) improves prediction smoothness at patch borders but increases computation time. postproc (bool): If True, applies post-processing to select the largest connected component from the UNETR output for each kidney mask clear_cache: If True, the downloaded pth file is removed again after running the inference. verbose (bool): If True, prints logging messages. Returns: dict: A dictionary with the keys 'leftkidney' and 'rightkidney', each containing a binary NumPy array representing the respective kidney mask. Example: >>> import numpy as np >>> import miblab >>> data = np.random.rand(128, 128, 30, 4) >>> mask = miblab.kidney_pc_dixon(data) >>> print(mask['kidney_left']) [0 1 1 ... 0 0 0] """ if not torch_installed: raise ImportError( 'torch is not installed. Please install it with "pip install torch".' 'To install all dlseg options at once, install miblab as pip install miblab[dlseg].' ) if not monai_installed: raise ImportError( 'totalsegmentator is not installed. Please install it with "pip install totalsegmentator".' 'To install all dlseg options at once, install miblab as pip install miblab[dlseg].' ) if model == 'unetr': MODEL = 'UNETR_kidneys_v2.pth' MODEL_DOI = "15521814" if verbose: print('Downloading model..') temp_dir = importlib_resources.files('miblab.datafiles') weights_path = zenodo_fetch(MODEL, temp_dir, MODEL_DOI) if verbose: print('Applying model to data..') # Setup device os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" device_str = "cuda" if torch.cuda.is_available() else "cpu" device = torch.device(device_str) # Define model architecture model = UNETR( in_channels=4, out_channels=3, # BACKGROUND, RIGHT KIDNEY (left on image), LEFT KIDNEY (right on image) img_size=(80, 80, 80), feature_size=16, hidden_size=768, mlp_dim=3072, num_heads=12, proj_type="perceptron", norm_name="instance", res_block=True, dropout_rate=0.0, ).to(device) input_array = np.transpose(input_array, (3, 0, 1, 2)) # from (x,y,z,c) to (c,y,x,z) # Normalize data input_array_out = (input_array[0,...]-np.average(input_array[0,...]))/np.std(input_array[0,...]) input_array_in = (input_array[1,...]-np.average(input_array[1,...]))/np.std(input_array[1,...]) input_array_water = (input_array[2,...]-np.average(input_array[2,...]))/np.std(input_array[2,...]) input_array_fat = (input_array[3,...]-np.average(input_array[3,...]))/np.std(input_array[3,...]) input_array = np.stack((input_array_out, input_array_in, input_array_water, input_array_fat), axis=0) # Convert to NCHW[D] format: (1,c,y,x,z) # NCHW[D] stands for: batch N, channels C, height H, width W, depth D input_array = input_array.transpose(0,2,1,3) # from (x,y,z) to (y,x,z) input_array = np.expand_dims(input_array, axis=(0)) # Convert to tensor input_tensor = torch.tensor(input_array) # Load model weights weights = torch.load(weights_path, map_location=device) model.load_state_dict(weights) model.eval() with torch.no_grad(): output_tensor = sliding_window_inference(input_tensor, (80,80,80), 4, model, overlap=overlap, device=device_str, progress=True) if verbose: print('Post-processing results...') # From probabilities for each channel to label image output_tensor = torch.argmax(output_tensor, dim=1) # Convert to numpy output_array = output_tensor.numpy(force=True)[0,:,:,:] # Transpose to original shape output_array = output_array.transpose(1,0,2) #from (y,x,z) to (x,y,z) elif model == 'nnunet': MODEL = 'nnunet_kidneys_v2.zip' MODEL_DOI = "15328218" if not nib_installed: raise ImportError( 'nibabel is not installed. Please install it with "pip install nibabel".' 'To install all dlseg options at once, install miblab as pip install miblab[dlseg].' ) if not scipy_installed: raise ImportError( 'scipy is not installed. Please install it with "pip install scipy".' 'To install all dlseg options at once, install miblab as pip install miblab[dlseg].' ) if verbose: print('Downloading model..') temp_dir = importlib_resources.files('miblab.datafiles') weights_path = zenodo_fetch(MODEL, temp_dir, MODEL_DOI,extract=True) if verbose: print('Applying model to data..') # Check is device has cuda (to speed up inference) os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" device_str = "cuda" if torch.cuda.is_available() else "cpu" # Setup device predictor = nnUNetPredictor( tile_step_size=0.75, use_gaussian=True, use_mirroring=False, perform_everything_on_device=True, device=torch.device(device_str), verbose=False, verbose_preprocessing=False, allow_tqdm=True ) # direct to "nnUNetTrainer__nnUNetPlans__3d_fullres" nested_folder = os.path.join(weights_path, "nnUNetTrainer__nnUNetPlans__3d_fullres") # Setup the predictor predictor.initialize_from_trained_model_folder( nested_folder, use_folds='all', checkpoint_name='checkpoint_best.pth' ) # Generate temp folder for intermediate data temp_folder_results = os.path.join(temp_dir,"temp_results") temp_folder_data_to_test = os.path.join(temp_dir,"temp_results",'data_to_test') os.makedirs(temp_folder_data_to_test, exist_ok=True) # Save arrays as nifti (.nii) affine = np.eye(4) nii_out_ph = nib.Nifti1Image(input_array[...,0], affine) nib.save(nii_out_ph, os.path.join(temp_folder_data_to_test, 'Dixon_999_0000.nii.gz')) nii_in_ph = nib.Nifti1Image(input_array[...,1], affine) nib.save(nii_in_ph, os.path.join(temp_folder_data_to_test, 'Dixon_999_0001.nii.gz')) nii_water = nib.Nifti1Image(input_array[...,2], affine) nib.save(nii_water, os.path.join(temp_folder_data_to_test, 'Dixon_999_0002.nii.gz')) nii_fat = nib.Nifti1Image(input_array[...,3], affine) nib.save(nii_fat, os.path.join(temp_folder_data_to_test, 'Dixon_999_0003.nii.gz')) # Infere kidney masks predictor.predict_from_files( temp_folder_data_to_test, temp_folder_results, save_probabilities=False, overwrite=True ) # Load the NIfTI file nifti_file = nib.load(os.path.join(temp_folder_results,'Dixon_999.nii.gz')) output_array = nifti_file.get_fdata() if postproc == True: left_kidney, right_kidney = _kidney_masks(output_array) else: left_kidney=output_array[output_array == 2] left_kidney[left_kidney==2]=1 right_kidney=output_array[output_array == 1] kidneys = { "kidney_left": left_kidney, "kidney_right": right_kidney } if clear_cache: if verbose: print('Deleting downloaded files...') clear_cache_datafiles(temp_dir) return kidneys
def _largest_cluster(array:np.ndarray)->np.ndarray: """Given a mask array, return a new mask array containing only the largesr cluster. Args: array (np.ndarray): mask array with values 1 (inside) or 0 (outside) Returns: np.ndarray: mask array with only a single connect cluster of pixels. """ # Label all features in the array label_img, cnt = ndi.label(array) # Find the label of the largest feature labels = range(1,cnt+1) size = [np.count_nonzero(label_img==l) for l in labels] max_label = labels[size.index(np.amax(size))] # Return a mask corresponding to the largest feature return label_img==max_label def _kidney_masks(output_array:np.ndarray)->tuple: """Extract kidney masks from the output array of the UNETR Args: output_array (np.ndarray): 3D numpy array (x,y,z) with integer labels (0=background, 1=right kidney, 2=left kidney) Returns: tuple: A tuple of 3D numpy arrays (left_kidney, right_kidney) with masks for the kidneys. """ left_kidney = _largest_cluster(output_array == 2) right_kidney = _largest_cluster(output_array == 1) return left_kidney, right_kidney