Source code for miblab_dl.fatwatermap

"""
Compute water-dominance masks from data that have fat and water maps
"""

import os
import sys
import subprocess
import tempfile

import numpy as np
import nibabel as nib
from miblab import zenodo_fetch

if sys.version_info < (3, 9):
    # importlib.resources either doesn't exist or lacks the files()
    # function, so use the PyPI version:
    import importlib_resources
else:
    # importlib.resources has files(), so use that:
    import importlib.resources as importlib_resources


[docs] def fatwater(op_phase, in_phase, te_o=None, te_i=None, t2s_w=15, t2s_f=10): """Compute fat and water maps from opposed-phase and in-phase arrays Args: op_phase (np.ndarray): opposed phase data in_phase (np.ndarray): in-phase data model (str): path to the model files Returns: fat, water: numpy arrays of the same shape and type as the input arrays. """ print('Downloading model..') temp_dir = importlib_resources.files('miblab_dl.datafiles') model = zenodo_fetch("FatWaterPredictor.zip", temp_dir, "17791059", extract=True) print('Predicting fat and water images..') waterdom = _predict_mask_numpy(model, op_phase, in_phase) fat, water = _compute_fatwater(waterdom, op_phase, in_phase, te_o, te_i, t2s_w, t2s_f) fat[fat < 0] = 0 water[water < 0] = 0 return fat, water
def _predict_mask_numpy(model, op_phase, in_phase): with tempfile.TemporaryDirectory() as input_folder: # Save numpy arrays as nifti case_id = "dixon" file_op = os.path.join(input_folder, f"{case_id}_0000.nii.gz") file_ip = os.path.join(input_folder, f"{case_id}_0001.nii.gz") nifti_op = nib.Nifti1Image(op_phase, np.eye(4)) nifti_ip = nib.Nifti1Image(in_phase, np.eye(4)) nib.save(nifti_op, file_op) nib.save(nifti_ip, file_ip) with tempfile.TemporaryDirectory() as output_folder: # Create predictions in a temporary output_folder _predict_mask_folder(model, input_folder, output_folder) # Return result as binary numpy array mask_file = os.path.join(output_folder, f"{case_id}.nii.gz") waterdom = nib.load(mask_file).get_fdata().astype(np.int8) return waterdom def _predict_mask_folder(model, input_folder, output_folder): # These two variables are not used but we are setting to a # dummy value to silence the warnings os.environ["nnUNet_raw"] = os.getcwd() os.environ["nnUNet_preprocessed"] = os.getcwd() # Folder containing the model weights os.environ["nnUNet_results"] = model with tempfile.TemporaryDirectory() as predictions: # Predict and save results in a temporary folder cmd = f"nnUNetv2_predict -d Dataset001_FatWaterPredictor -i {input_folder} -o {predictions} -f 0 1 2 3 4 -tr nnUNetTrainer -c 3d_fullres -p nnUNetPlans" process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", # <-- force UTF-8 decoding errors="replace" # <-- avoids crash if weird bytes appear ) # Stream logs in real-time for line in process.stdout: print(line, end="") process.wait() # wait for completion # Run post-processing os.makedirs(output_folder, exist_ok=True) source = os.path.join(model, 'Dataset001_FatWaterPredictor', 'nnUNetTrainer__nnUNetPlans__3d_fullres', "crossval_results_folds_0_1_2_3_4") pproc = os.path.join(source, 'postprocessing.pkl') plans = os.path.join(source, 'plans.json') cmd = f"nnUNetv2_apply_postprocessing -i {predictions} -o {output_folder} -pp_pkl_file {pproc} -np 8 -plans_json {plans}" process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, encoding="utf-8", # <-- force UTF-8 decoding errors="replace" # <-- avoids crash if weird bytes appear ) # Stream logs in real-time for line in process.stdout: print(line, end="") process.wait() # wait for completion def _compute_fatwater(waterdom, op_phase, in_phase, te_o, te_i, t2s_w, t2s_f): if te_o is None: Eof, Eif, Eow, Eiw = 1, 1, 1, 1 else: Eof = np.exp(-te_o/t2s_f) Eif = np.exp(-te_i/t2s_f) Eow = np.exp(-te_o/t2s_w) Eiw = np.exp(-te_i/t2s_w) Efatdom = np.array([[Eof, -Eow], [Eif, Eiw]]) Ewatdom = np.array([[-Eof, Eow], [Eif, Eiw]]) Efatdom_inv = np.linalg.inv(Efatdom) Ewatdom_inv = np.linalg.inv(Ewatdom) fat, water = _apply_pixelwise_matrix(op_phase, in_phase, waterdom, Efatdom_inv, Ewatdom_inv) return fat, water def _apply_pixelwise_matrix(a, b, mask, M0, M1) -> np.ndarray: """ For each pixel/voxel combine [a, b] as a 2-vector v and compute: result = M0 @ v if mask == 0/False result = M1 @ v if mask == 1/True Returns arrays c, d of same shape and type Parameters ---------- a, b : np.ndarray Input 3D arrays of the same shape (spatial). mask : np.ndarray Boolean/0-1 array same shape as `a`/`b`. True selects M1. M0, M1 : array-like (2x2) Two 2x2 matrices. Returns ------- np.ndarray Output 3D arrays of the same shape (spatial). """ # stack components into last axis: shape (..., 2) v = np.stack((a, b), axis=-1).astype(float) # shape (Z, Y, X, 2) # compute results for both matrices: result = v @ M.T (vector @ M.T => M @ v per-voxel) res0 = v @ M0.T # shape (..., 2) res1 = v @ M1.T # Select based on mask; expand mask to last axis mask_bool = np.asarray(mask, dtype=bool) mask_expanded = mask_bool[..., None] # shape (..., 1) result = np.where(mask_expanded, res1, res0) # shape (..., 2) return result[...,0], result[...,1]