#!/usr/bin/env python3
"""
This module contains functions for loading and saving 3D images.
Main Functions:
---------------
- load_3D_img: Load a 3D image from a .czi, .nii.gz, or .tif series and return the ndarray.
- save_3D_img: Save a 3D image as a .nii.gz, .tif series, .h5, or .zarr file.
Helper Functions:
-----------------
- extract_resolution
- load_image_metadata_from_txt
- save_metadata_to_file
- metadata
- return_3D_img
"""
import json
import os
import re
import cv2
import dask.array as da
import h5py
import nibabel as nib
import numpy as np
import tifffile
import xml.etree.ElementTree as ET
import zarr
from aicspylibczi import CziFile
from concurrent.futures import ThreadPoolExecutor
from glob import glob
from lxml import etree
from pathlib import Path
from rich import print
from unravel.core.utils import match_files, print_func_name_args_times
# TODO: save_as_nii() add logic for using the reference image for dtype (e.g., if reference is provided and dtype is None, use reference dtype)
# TODO: Add support for extracting metadata from .zarr files.
[docs]
@print_func_name_args_times()
def return_3D_img(ndarray, return_metadata=False, return_res=False, xy_res=None, z_res=None, x_dim=None, y_dim=None, z_dim=None):
"""
Return the 3D image ndarray and optionally resolutions (xy_res, z_res) or metadata (xy_res, z_res, x_dim, y_dim, z_dim).
Parameters
----------
ndarray : ndarray
The 3D image array.
return_metadata : bool, optional
Whether to return metadata. Default is False.
return_res : bool, optional
Whether to return resolutions. Default is False.
xy_res : float, optional
The resolution in the xy-plane.
z_res : float, optional
The resolution in the z-plane.
x_dim : int, optional
The size of the image in the x-dimension.
y_dim : int, optional
The size of the image in the y-dimension.
z_dim : int, optional
The size of the image in the z-dimension.
Returns
-------
ndarray
The 3D image array.
tuple, optional
If return_res is True, returns (ndarray, xy_res, z_res).
tuple, optional
If return_metadata is True, returns (ndarray, xy_res, z_res, x_dim, y_dim, z_dim).
"""
if return_metadata:
return ndarray, xy_res, z_res, x_dim, y_dim, z_dim
elif return_res:
return ndarray, xy_res, z_res
return ndarray
[docs]
@print_func_name_args_times()
def load_czi(czi_path, channel=0, desired_axis_order="xyz", return_res=False, return_metadata=False, save_metadata=None, xy_res=None, z_res=None):
"""
Load a .czi image and return the ndarray.
Parameters
----------
czi_path : str
The path to the .czi file.
channel : int, optional
The channel to load. Default is 0.
desired_axis_order : str, optional
The desired order of the image axes. Default is 'xyz'.
return_res : bool, optional
Whether to return resolutions. Default is False.
return_metadata : bool, optional
Whether to return metadata. Default is False.
save_metadata : str, optional
Path to save metadata file. Default is None.
xy_res : float, optional
The resolution in the xy-plane.
z_res : float, optional
The resolution in the z-plane.
Returns
-------
ndarray
The loaded 3D image array.
tuple, optional
If return_res is True, returns (ndarray, xy_res, z_res).
tuple, optional
If return_metadata is True, returns (ndarray, xy_res, z_res, x_dim, y_dim, z_dim).
"""
czi = CziFile(czi_path)
ndarray = np.squeeze(czi.read_image(C=channel)[0])
if ndarray.ndim == 4:
print(f"\n[red1].czi channel {channel} has 4 axes. Please stitch tiles from {Path(czi_path).name}\n")
import sys ; sys.exit()
ndarray = np.transpose(ndarray, (2, 1, 0)) if desired_axis_order == "xyz" else ndarray
xy_res, z_res, x_dim, y_dim, z_dim = metadata(czi_path, ndarray, return_res, return_metadata, xy_res, z_res, save_metadata)
return return_3D_img(ndarray, return_metadata, return_res, xy_res, z_res, x_dim, y_dim, z_dim)
[docs]
@print_func_name_args_times()
def load_tifs(tif_dir_path, desired_axis_order="xyz", return_res=False, return_metadata=False, save_metadata=None, xy_res=None, z_res=None, parallel_loading=True):
"""
Load a series of .tif images and return the ndarray.
Parameters
----------
tif_dir_path : str
The path to the directory containing the .tif files.
desired_axis_order : str, optional
The desired order of the image axes. Default is 'xyz'.
return_res : bool, optional
Whether to return resolutions. Default is False.
return_metadata : bool, optional
Whether to return metadata. Default is False.
save_metadata : str, optional
Path to save metadata file. Default is None.
xy_res : float, optional
The resolution in the xy-plane.
z_res : float, optional
The resolution in the z-plane.
parallel_loading : bool, optional
Whether to load images in parallel. Default is True.
Returns
-------
ndarray
The loaded 3D image array.
tuple, optional
If return_res is True, returns (ndarray, xy_res, z_res).
tuple, optional
If return_metadata is True, returns (ndarray, xy_res, z_res, x_dim, y_dim, z_dim).
"""
def load_single_tif(tif_file):
"""Load a single .tif file using OpenCV and return the ndarray."""
img = cv2.imread(str(tif_file), cv2.IMREAD_UNCHANGED)
if img is None:
raise ValueError(f"Failed to load image: {tif_file}")
return img
tif_files = match_files('*.tif', base_path=tif_dir_path)
if parallel_loading:
with ThreadPoolExecutor() as executor:
tifs_stacked = list(executor.map(load_single_tif, tif_files))
else:
tifs_stacked = []
for tif_file in tif_files:
tifs_stacked.append(load_single_tif(tif_file))
ndarray = np.stack(tifs_stacked, axis=0)
ndarray = np.transpose(ndarray, (2, 1, 0)) if desired_axis_order == "xyz" else ndarray
xy_res, z_res, x_dim, y_dim, z_dim = metadata(tif_files[0], ndarray, return_res, return_metadata, xy_res, z_res, save_metadata)
return return_3D_img(ndarray, return_metadata, return_res, xy_res, z_res, x_dim, y_dim, z_dim)
[docs]
def load_3D_tif(tif_path, desired_axis_order="xyz", return_res=False, return_metadata=False, save_metadata=None, xy_res=None, z_res=None):
"""
Load a 3D .ome.tif or .tif image and return the ndarray.
Parameters
----------
tif_path : str
The path to the .ome.tif file.
desired_axis_order : str, optional
The desired order of the image axes. Default is 'xyz'.
return_res : bool, optional
Whether to return resolutions (works for .ome.tif). Default is False.
Returns
-------
ndarray
The loaded 3D image array.
tuple, optional
If return_res is True, returns (ndarray, xy_res, z_res).
"""
with tifffile.TiffFile(tif_path) as tif:
print(f"\n Loading {tif_path} as ndarray")
ndarray = tif.asarray() # Load the image into memory
ndarray = np.transpose(ndarray, (2, 1, 0)) if desired_axis_order == "xyz" else ndarray
x_dim, y_dim, z_dim = ndarray.shape
# If resolution is requested, extract it from OME-XML metadata
if xy_res is None or z_res is None:
first_page = tif.pages[0]
description = first_page.tags[270].value
try:
# Remove the OME comment header to parse XML correctly
ome_xml = description.split('-->')[-1].strip()
root = ET.fromstring(ome_xml)
# Find the Pixels element and get PhysicalSizeX and PhysicalSizeZ
pixels = root.find(".//{http://www.openmicroscopy.org/Schemas/OME/2016-06}Pixels")
xy_res = float(pixels.attrib['PhysicalSizeX'])
z_res = float(pixels.attrib['PhysicalSizeZ'])
except (ET.ParseError, KeyError, TypeError) as e:
raise ValueError(f"\n Unable to parse OME-XML metadata: {e}\n")
if save_metadata is not None:
save_metadata_to_file(xy_res, z_res, x_dim, y_dim, z_dim, save_metadata=save_metadata)
return return_3D_img(ndarray, return_metadata, return_res, xy_res, z_res, x_dim, y_dim, z_dim)
[docs]
def nii_path_or_nii(nii):
"""Helper function to load a NIfTI image if it's a path. Returns the NIfTI image object.
Parameters:
-----------
nii : str, Path, or nib.Nifti1Image
Path to the NIfTI image file or a Nifti1Image object.
Returns:
--------
nib.Nifti1Image
The NIfTI image object.
"""
if isinstance(nii, nib.Nifti1Image):
return nii
elif isinstance(nii, (str, Path)) and Path(nii).exists():
return nib.load(nii)
else:
raise FileNotFoundError(f"\nInput file not found: {nii}\n")
[docs]
@print_func_name_args_times()
def nii_to_ndarray(nii):
"""Load a NIfTI image and return as a 3D ndarray.
Parameters:
-----------
nii : str, Path, or nib.Nifti1Image
Path to the NIfTI image file or a Nifti1Image object.
Returns:
--------
ndarray : ndarray
The 3D image array.
"""
nii = nii_path_or_nii(nii)
ndarray = np.asanyarray(nii.dataobj, dtype=nii.header.get_data_dtype()).squeeze()
return ndarray
[docs]
@print_func_name_args_times()
def load_nii(nii_path, desired_axis_order="xyz", return_res=False, return_metadata=False, save_metadata=None, xy_res=None, z_res=None):
"""
Load a .nii.gz image and return the ndarray.
Parameters
----------
nii_path : str
The path to the .nii.gz file.
desired_axis_order : str, optional
The desired order of the image axes. Default is 'xyz'.
return_res : bool, optional
Whether to return resolutions. Default is False.
return_metadata : bool, optional
Whether to return metadata. Default is False.
save_metadata : str, optional
Path to save metadata file. Default is None.
xy_res : float, optional
The resolution in the xy-plane (use if res is not specified in the metadata).
z_res : float, optional
The resolution in the z-plane.
Returns
-------
ndarray
The loaded 3D image array.
tuple, optional
If return_res is True, returns (ndarray, xy_res, z_res).
tuple, optional
If return_metadata is True, returns (ndarray, xy_res, z_res, x_dim, y_dim, z_dim).
Notes
-----
- If xy_res and z_res are provided, they will be used instead of the values from the metadata.
"""
ndarray = nii_to_ndarray(nii_path)
ndarray = np.transpose(ndarray, (2, 1, 0)) if desired_axis_order == "zyx" else ndarray
res_specified = True if xy_res is not None else False
if res_specified:
original_xy_res = xy_res
original_z_res = z_res
xy_res, z_res, x_dim, y_dim, z_dim = metadata(nii_path, ndarray, return_res, return_metadata, save_metadata=save_metadata)
if res_specified:
xy_res = original_xy_res
z_res = original_z_res
return return_3D_img(ndarray, return_metadata, return_res, xy_res, z_res, x_dim, y_dim, z_dim)
[docs]
@print_func_name_args_times()
def load_nii_subset(nii_path, xmin, xmax, ymin, ymax, zmin, zmax):
"""
Load a spatial subset of a .nii.gz file and return an ndarray.
Parameters
----------
nii_path : str
The path to the .nii.gz file.
xmin, xmax, ymin, ymax, zmin, zmax : int
The spatial coordinates defining the subset.
Returns
-------
ndarray
The loaded subset of the 3D image.
"""
proxy_img = nib.load(nii_path)
subset_array = proxy_img.dataobj[xmin:xmax, ymin:ymax, zmin:zmax]
return np.squeeze(subset_array)
[docs]
@print_func_name_args_times()
def load_h5(hdf5_path, desired_axis_order="xyz", return_res=False, return_metadata=False, save_metadata=None, xy_res=None, z_res=None):
"""
Load full resolution image from an HDF5 file (.h5) and return the ndarray.
Parameters
----------
hdf5_path : str
The path to the .h5 file.
desired_axis_order : str, optional
The desired order of the image axes. Default is 'xyz'.
return_res : bool, optional
Whether to return resolutions. Default is False.
return_metadata : bool, optional
Whether to return metadata. Default is False.
save_metadata : str, optional
Path to save metadata file. Default is None.
xy_res : float, optional
The resolution in the xy-plane.
z_res : float, optional
The resolution in the z-plane.
Returns
-------
ndarray
The loaded 3D image array.
tuple, optional
If return_res is True, returns (ndarray, xy_res, z_res).
tuple, optional
If return_metadata is True, returns (ndarray, xy_res, z_res, x_dim, y_dim, z_dim).
"""
with h5py.File(hdf5_path, 'r') as f:
full_res_dataset_name = next(iter(f.keys())) # Assumes first dataset = full res image
dataset = f[full_res_dataset_name]
print(f"\n Loading {full_res_dataset_name} as ndarray")
ndarray = dataset[:] # Load the full res image into memory (if not enough RAM, chunck data [e.g., w/ dask array])
ndarray = np.transpose(ndarray, (2, 1, 0)) if desired_axis_order == "xyz" else ndarray
xy_res, z_res, x_dim, y_dim, z_dim = metadata(hdf5_path, ndarray, return_res, return_metadata, save_metadata=save_metadata)
return return_3D_img(ndarray, return_metadata, return_res, xy_res, z_res, x_dim, y_dim, z_dim)
[docs]
@print_func_name_args_times()
def load_zarr(zarr_path, channel=0, desired_axis_order="xyz", return_res=False, return_metadata=False, save_metadata=None, xy_res=None, z_res=None, level=None, verbose=False):
"""
Load a channel and level of a Zarr image, optionally returning voxel resolution.
Parameters
----------
zarr_path : str or Path
Path to .zarr directory.
channel : int, optional
Channel index to load (for 4D data). If None, loads all channels.
desired_axis_order : str, optional
Desired output axis order (default: "xyz").
return_res : bool, optional
If True, returns voxel resolution in mm (xy_res, z_res) along with the image.
save_metadata : str, optional
Path to save metadata file. Default is None.
xy_res : float, optional
Resolution in the xy-plane (in microns). If None, will be extracted from metadata.
z_res : float, optional
Resolution in the z-plane (in microns). If None, will be extracted from metadata.
level : str or int, optional
Resolution level to load (default: highest).
verbose : bool
Print debug output.
Returns
-------
ndarray
3D numpy array from the zarr store, with shape (z, y, x) or (z, x, y) depending on desired_axis_order.
tuple, optional
If return_res is True, returns (ndarray, xy_res, z_res).
tuple, optional
If return_metadata is True, returns (ndarray, xy_res, z_res, x_dim, y_dim, z_dim).
"""
zarr_path = Path(zarr_path)
if not zarr_path.exists():
raise FileNotFoundError(f"Zarr path {zarr_path} does not exist.")
def log(msg):
if verbose:
print(msg)
# Load .zattrs
zattrs = {}
zattrs_path = zarr_path / ".zattrs"
if zattrs_path.exists():
with open(zattrs_path) as f:
zattrs = json.load(f)
level_str = str(level) if level is not None else None
xy_res = z_res = None
if "multiscales" in zattrs:
multiscale = zattrs["multiscales"][0]
axes = multiscale.get("axes", [])
datasets = multiscale.get("datasets", [])
# Infer highest-res level available on disk
level_str = str(level) if level is not None else None
if level_str is None and datasets:
dataset_dirs = [ds["path"] for ds in datasets if ds["path"] == "." or str(ds["path"]).isdigit()]
existing_dirs = [p for p in dataset_dirs if (zarr_path / p).exists()]
if not level_str and existing_dirs:
if "." in existing_dirs:
level_str = "."
elif existing_dirs:
level_str = str(min(map(int, existing_dirs)))
# Extract resolution for this level
dataset = next((d for d in datasets if str(d["path"]) == level_str), None)
if dataset:
transforms = dataset.get("coordinateTransformations", [])
if transforms:
scale = transforms[0].get("scale", [])
if len(scale) == len(axes):
res_dict = {axis["name"]: s for axis, s in zip(axes, scale)}
xy_res = res_dict.get("x", None)
z_res = res_dict.get("z", None)
# Convert to micrometers
xy_res = xy_res * 1e3 if xy_res is not None else None
z_res = z_res * 1e3 if z_res is not None else None
# Load image data
ndarray = None
if level_str: # If a level is specified or found, load that level
level_path = zarr_path / level_str
if level_path.exists():
log(f" Multi-resolution structure detected: loading level {level_str}")
ndarray = da.from_zarr(level_path).compute() # convert dask array to numpy array
else:
raise ValueError(f"Specified level {level_str} does not exist in {zarr_path}")
else: # Load flat format (.zattrs is missing or it does not match expected metadata structure)
log(" No compatible .zattrs metadata found. Loading flat zarr format.")
zarr_dataset = zarr.open(zarr_path, mode='r')
ndarray = np.array(zarr_dataset)
# Extract channel if specified (e.g., 0 for the first channel, 1 for the second, etc.)
log(f" Array shape ([C], Z, Y, X): {ndarray.shape}")
if ndarray.ndim == 4:
if channel is not None:
log(f" Extracted channel: {channel}")
ndarray = ndarray[channel]
else:
raise ValueError(f"Multiple channels found: {ndarray.shape[0]} channels. Please specify a channel index.")
if ndarray.ndim != 3:
raise ValueError(f"Expected 3D array, but got shape {ndarray.shape}")
# Transpose to desired axis order
if desired_axis_order == "xyz":
ndarray = np.transpose(ndarray, (2, 1, 0))
log(f" Array shape after transposing to (X, Y, Z): {ndarray.shape}")
xy_res, z_res, x_dim, y_dim, z_dim = metadata(zarr_path, ndarray, return_res, return_metadata, xy_res, z_res, save_metadata)
return return_3D_img(ndarray, return_metadata=return_metadata, return_res=return_res, xy_res=xy_res, z_res=z_res, x_dim=x_dim, y_dim=y_dim, z_dim=z_dim)
[docs]
def resolve_path(upstream_path, path_or_pattern, make_parents=True, is_file=True):
"""
Returns full path or Path(upstream_path, path_or_pattern) and optionally creates parent directories.
Parameters
----------
upstream_path : str
The base path.
path_or_pattern : str
The relative path or glob pattern.
make_parents : bool, optional
Whether to create parent directories if they don't exist. Default is True.
is_file : bool, optional
Whether the path is a file. Default is True.
Returns
-------
Path or None
The resolved path or None if not found.
"""
if Path(path_or_pattern).is_absolute():
if is_file:
Path(path_or_pattern).parent.mkdir(parents=True, exist_ok=True)
else:
Path(path_or_pattern).mkdir(parents=True, exist_ok=True)
return Path(path_or_pattern)
full_path = Path(upstream_path, path_or_pattern)
if full_path.exists():
return full_path
glob_matches = sorted(full_path.parent.glob(full_path.name))
if glob_matches:
return glob_matches[0] # Return the first match
# Make parent dirs for future output
if make_parents:
if is_file:
full_path.parent.mkdir(parents=True, exist_ok=True)
else:
full_path.mkdir(parents=True, exist_ok=True)
return full_path
return None
[docs]
@print_func_name_args_times()
def load_3D_img(img_path, channel=0, desired_axis_order="xyz", return_res=False, return_metadata=False, xy_res=None, z_res=None, save_metadata=None, verbose=False):
"""
Load a 3D image from various file formats and return the ndarray.
Parameters
----------
img_path : str
The path to the image file (.czi, .ome.tif, .tif, .nii.gz, .h5, .zarr) or directory w/ 2D .ome.tif or .tif files.
channel : int, optional
The channel to load. Default is 0.
desired_axis_order : str, optional
The desired order of the image axes. Default is 'xyz'.
return_res : bool, optional
Whether to return resolutions. Default is False.
return_metadata : bool, optional
Whether to return metadata. Default is False.
xy_res : float, optional
The resolution in the xy-plane.
z_res : float, optional
The resolution in the z-plane.
save_metadata : str, optional
Path to save metadata file. Default is None.
Returns
-------
ndarray
The loaded 3D image array.
tuple, optional
If return_res is True, returns (ndarray, xy_res, z_res).
tuple, optional
If return_metadata is True, returns (ndarray, xy_res, z_res, x_dim, y_dim, z_dim).
"""
# If file_path points to dir containing tifs, resolve path to first .tif
img_path = Path(img_path)
if img_path.is_dir() and not str(img_path).endswith('.zarr'):
tif_files = match_files('*.tif', base_path=img_path)
if tif_files:
return load_tifs(img_path, desired_axis_order, return_res, return_metadata, save_metadata, xy_res, z_res)
if not img_path.exists():
raise FileNotFoundError(f"\nNo compatible image files found at {img_path} for load_3D_img(). Use: .czi, .ome.tif, .tif, .nii.gz, .h5, .zarr")
# Load image based on file type and optionally return resolutions and dimensions
try:
if str(img_path).endswith('.czi'):
return load_czi(img_path, channel=channel, desired_axis_order=desired_axis_order, return_res=return_res, return_metadata=return_metadata, save_metadata=save_metadata, xy_res=xy_res, z_res=z_res)
elif str(img_path).endswith('.ome.tif') or str(img_path).endswith('.tif'):
return load_3D_tif(img_path, desired_axis_order, return_res, return_metadata, save_metadata, xy_res, z_res)
elif str(img_path).endswith('.nii.gz'):
return load_nii(img_path, desired_axis_order, return_res, return_metadata, save_metadata, xy_res, z_res)
elif str(img_path).endswith('.h5'):
return load_h5(img_path, desired_axis_order, return_res, return_metadata, save_metadata, xy_res, z_res)
elif str(img_path).endswith('.zarr'):
return load_zarr(img_path, channel=channel, desired_axis_order=desired_axis_order, return_res=return_res, return_metadata=return_metadata, save_metadata=save_metadata, xy_res=xy_res, z_res=z_res, verbose=verbose)
else:
raise ValueError(f"Unsupported file type: {img_path.suffix}. Supported file types: .czi, .ome.tif, .tif, .nii.gz, .h5")
except (FileNotFoundError, ValueError) as e:
print(f"\n [red bold]Error: {e}\n")
import sys; sys.exit()
# Save images
[docs]
@print_func_name_args_times()
def save_as_nii(ndarray, output, xy_res=1000, z_res=1000, data_type=None, reference=None):
"""
Save a numpy array as a .nii.gz image with the specified resolution and orientation, using a reference image if provided.
Parameters
----------
ndarray : ndarray
The numpy array to save as a NIFTI image.
output : str or Path
The file path to save the output image. '.nii.gz' is appended if not present.
xy_res : float, optional
XY-plane resolution in microns. Default is 1000.
z_res : float, optional
Z-axis resolution in microns. Default is 1000.
data_type : data-type, optional
Data type for the NIFTI image. Default is np.float32.
reference : str, Path, or nib.Nifti1Image, optional
Either a path to a reference .nii.gz file or a Nifti1Image object
to set orientation and resolution. If provided, `xy_res` and `z_res` are ignored. Default is None.
Notes
-----
- The function will automatically create parent directories if they do not exist.
- The affine transformation is constructed assuming RAS orientation if no reference is provided.
"""
output = Path(output).resolve()
output.parent.mkdir(parents=True, exist_ok=True)
if not str(output).endswith('.nii.gz'):
output = output.with_suffix('.nii.gz')
if reference is not None:
if isinstance(reference, (str, Path)):
ref_nii = nib.load(str(reference))
elif isinstance(reference, nib.Nifti1Image):
ref_nii = reference
else:
raise ValueError("\nReference for save_as_nii() must be a path to a .nii.gz file or a Nifti1Image object.\n")
affine = ref_nii.affine
header = ref_nii.header.copy()
if data_type is None:
data_type = header.get_data_dtype()
else:
# Create the affine matrix with the appropriate resolutions (converting microns to mm)
affine = np.diag([xy_res / 1000, xy_res / 1000, z_res / 1000, 1]) # RAS orientation
header = nib.Nifti1Header()
# Create and save the NIFTI image
nii = nib.Nifti1Image(ndarray, affine, header)
nii.header.set_data_dtype(data_type or np.float32)
nib.save(nii, output)
print(f"\n Output: [default bold]{output}")
[docs]
@print_func_name_args_times()
def save_as_tifs(ndarray, tif_dir_out=None, ndarray_axis_order="xyz", parallel=True, max_workers=None, verbose=False):
"""
Save an ndarray as a series of .tif images.
Parameters
----------
ndarray : ndarray
The 3D image array to save.
tif_dir_out : str or Path
The directory to save the .tif files.
ndarray_axis_order : str, optional
The order of the ndarray axes. Default is 'xyz'.
parallel : bool, optional
Whether to save slices in parallel. Default is False.
max_workers : int or None, optional
Number of threads to use for parallel saving. Defaults to os.cpu_count().
verbose : bool, optional
How to print the output directory. Default is False.
Returns
-------
None
"""
tif_dir_out = Path(tif_dir_out)
tif_dir_out.mkdir(parents=True, exist_ok=True)
if ndarray_axis_order == "xyz":
ndarray = np.transpose(ndarray, (2, 1, 0)) # to zyx
def save_slice(i):
slice_file_path = tif_dir_out / f"slice_{i:04d}.tif"
tifffile.imwrite(str(slice_file_path), ndarray[i])
if parallel:
with ThreadPoolExecutor(max_workers=max_workers or os.cpu_count()) as executor:
list(executor.map(save_slice, range(ndarray.shape[0])))
else:
for i in range(ndarray.shape[0]):
save_slice(i)
if verbose:
print(f" Output directory: [magenta]{tif_dir_out}")
else:
print(f"Output directory with tif series: [magenta]{tif_dir_out}")
[docs]
@print_func_name_args_times()
def save_as_zarr(ndarray, output_path=None, ndarray_axis_order="xyz", xy_res=None, z_res=None, verbose=False):
"""
Save a 3D ndarray to a .zarr file as well as OME-NGFF-compatible metadata.
Parameters
----------
ndarray : ndarray
The 3D image array to save.
output_path : str
The path to save the .zarr file.
ndarray_axis_order : str, optional
The order of the ndarray axes. Default is 'xyz'.
xy_res : float, optional
The voxel size in the XY plane (in mm).
z_res : float, optional
The voxel size in the Z direction (in mm).
"""
if ndarray_axis_order == "xyz":
ndarray = np.transpose(ndarray, (2, 1, 0))
if isinstance(ndarray, da.Array):
dask_array = ndarray
else:
dask_array = da.from_array(ndarray, chunks='auto')
compressor = zarr.Blosc(cname='lz4', clevel=9, shuffle=zarr.Blosc.BITSHUFFLE)
dask_array.to_zarr(output_path, compressor=compressor, overwrite=True)
if verbose:
print(f"\n Saved zarr as: [default bold]{output_path}")
# Add NGFF-style .zattrs
first_axis = "x" if ndarray_axis_order == "xyz" else "z"
third_axis = "z" if first_axis == "x" else "x"
if xy_res is not None or z_res is not None:
attrs = {
"multiscales": [{
"version": "0.4",
"axes": [
{"name": first_axis, "type": "space", "unit": "millimeter"},
{"name": "y", "type": "space", "unit": "millimeter"},
{"name": third_axis, "type": "space", "unit": "millimeter"},
],
"datasets": [{
"path": ".",
"coordinateTransformations": [{
"type": "scale",
"scale": [xy_res, xy_res, z_res]
}]
}]
}]
}
zarr.open(output_path, mode='a').attrs.update(attrs)
if verbose:
print(f" Added NGFF-style .zattrs with resolutions (in mm): xy_res={xy_res}, z_res={z_res}")
[docs]
@print_func_name_args_times()
def save_as_h5(ndarray, output_path, ndarray_axis_order="xyz"):
"""
Save an ndarray to an HDF5 file (.h5).
Parameters
----------
ndarray : ndarray
The 3D image array to save.
output_path : str
The path to save the .h5 file.
ndarray_axis_order : str, optional
The order of the ndarray axes. Default is 'xyz'.
Returns
-------
None
"""
if ndarray_axis_order == "xyz":
ndarray = np.transpose(ndarray, (2, 1, 0))
with h5py.File(output_path, 'w') as f:
f.create_dataset('data', data=ndarray, compression="lzf")
[docs]
@print_func_name_args_times()
def save_3D_img(img, output_path=None, ndarray_axis_order="xyz", xy_res=1000, z_res=1000, data_type=np.float32, reference_img=None, verbose=False):
"""
Save a 3D image in various formats.
Parameters
----------
img : ndarray
The 3D image array to save.
output_path : str
The path to save the image. The file extension determines the format: .nii.gz, .zarr, .h5, .tif, or a directory path for a TIFF series.
ndarray_axis_order : str, optional
The order of the ndarray axes. Default is 'xyz'.
xy_res : float, optional
The x/y resolution in microns for a NIFTI image output. Default is 1000.
z_res : float, optional
The z resolution in microns for a NIFTI image output. Default is 1000.
data_type : data-type, optional
Data type for a NIFTI image output. Default is np.float32.
reference_img : str, Path, or a nib.Nifti1Image object, optional
Either path to a .nii.gz file or a Nifti1Image object to set orientation and resolution. Default is None. If provided, xy_res and z_res are ignored.
verbose : bool, optional
Print addtional infomation. Default is False.
"""
output = Path(output_path)
output.parent.mkdir(exist_ok=True, parents=True)
output_str = str(output).lower()
suffix = output.suffix.lower()
if output_str.endswith('.nii.gz'):
save_as_nii(img, output, xy_res, z_res, data_type=data_type, reference=reference_img)
elif suffix == '.zarr':
save_as_zarr(img, output, ndarray_axis_order=ndarray_axis_order, xy_res=xy_res, z_res=z_res, verbose=verbose)
elif suffix == '.h5':
save_as_h5(img, output, ndarray_axis_order=ndarray_axis_order)
elif suffix == '.tif':
# If user gave a file path like red2/slice_0000.tif, use the parent directory
save_as_tifs(img, tif_dir_out=output.parent, ndarray_axis_order=ndarray_axis_order, parallel=True, max_workers=None, verbose=verbose)
elif suffix == '':
# If no suffix, assume directory path for TIFFs
save_as_tifs(img, tif_dir_out=output, ndarray_axis_order=ndarray_axis_order, parallel=True, max_workers=None, verbose=verbose)
else:
raise ValueError(f"Unsupported file type for save_3D_img(): '{suffix}'. Use: .nii.gz, .zarr, .h5, .tif, or a directory path for a TIFF series.")
# TODO: Function to extract a resolution level from a Zarr file (return as ndarray)
[docs]
@print_func_name_args_times()
def zarr_level_to_tifs(zarr_path, output_dir, channel_map, resolution_level=None, xy_res=None, z_res=None):
"""
Extracts a specified resolution level from a Zarr file and saves the specified channels as TIFF files.
Parameters:
-----------
zarr_path : str or Path
Path to the Zarr file.
output_dir : str or Path
Directory to save the output TIFF files. If None, defaults to "TIFFs/<sample_name>".
resolution_level : str
Resolution level to extract (e.g., "0", "1", ..., "9").
channel_map : dict
Mapping of output directory names to Zarr channel indices (e.g., {'red': 0, 'green': 1}).
xy_res : float, optional
X/Y resolution in microns.
z_res : float, optional
Z resolution in microns. Default is 100 µm for Genetic Tools Atlas data.
"""
# Check what resolution levels are available in the Zarr file
zarr_path = Path(zarr_path)
if not zarr_path.exists():
raise FileNotFoundError(f"Zarr input path does not exist: {zarr_path}")
else:
print(f"Zarr input path exists: {zarr_path}")
levels = [str(level) for level in zarr_path.iterdir() if level.is_dir()]
if len(levels) == 1:
resolution_level = levels[0]
elif resolution_level is None:
raise ValueError(f"Multiple resolution levels found in {zarr_path.name}. Please specify a resolution level. Available levels: {levels}")
if not zarr_path.is_dir():
raise NotADirectoryError(f"Zarr input path is not a directory: {zarr_path}")
if not (zarr_path / resolution_level).is_dir():
raise FileNotFoundError(f"Resolution level {resolution_level} not found in Zarr file: {zarr_path}")
z = zarr.open(zarr_path, mode="r")
z_level = z[resolution_level]
if z_level.ndim != 4:
raise ValueError(f"Expected shape (c, z, y, x), got {z_level.shape}")
num_channels = z_level.shape[0]
if num_channels == 1:
print(f"⚠️ Only one channel found in {zarr_path.name}")
print(f"📂 Processing {zarr_path.name} at resolution level {resolution_level}...")
for name, idx in channel_map.items():
out_dir = output_dir / name
tif_files = match_files('*.tif', base_path=out_dir)
if tif_files:
print(f"⚠️ Skipping {name} in {zarr_path.name}: output TIFFs already exist at {out_dir}")
continue
if idx >= num_channels:
print(f"⚠️ Channel index {idx} not found in {zarr_path.name} (only {num_channels} channels). Skipping {name}.")
continue
save_as_tifs(z_level[idx], out_dir, ndarray_axis_order="zyx", parallel=True)
save_metadata_to_file(
xy_res=xy_res,
z_res=z_res,
x_dim=z_level[idx].shape[2],
y_dim=z_level[idx].shape[1],
z_dim=z_level[idx].shape[0],
save_metadata=Path(out_dir).parent / "parameters" / "metadata.txt"
)
# Other functions
[docs]
def nii_voxel_size(nii, use_um=True):
"""Get the resolution (voxel size) of a NIfTI image.
Parameters:
-----------
nii : str, Path, or nib.Nifti1Image
Path to the NIfTI image file or a Nifti1Image object.
use_um : bool, optional. Default is True.
If True, return the resolution in micrometers (um). If False, return the resolution in millimeters (mm).
Returns:
--------
res : tuple of float or float
If anisotropic, returns (x_res, y_res, z_res) in micrometers (um) or millimeters (mm).
If isotropic, returns a single float value for the resolution in micrometers (um) or millimeters (mm).
"""
nii = nii_path_or_nii(nii)
res = nii.header.get_zooms() # (x, y, z) in mm
if use_um:
res = tuple([r * 1000 for r in res]) # Convert to micrometers
# Return as a single value if isotropic, else as a tuple
return res[0] if res[0] == res[1] == res[2] else res