Source code for unravel.cluster_stats.org_data

#!/usr/bin/env python3

"""
Use ``cstats_org_data`` (``cod``) from UNRAVEL to aggregate and organize csv outputs from ``cstats_validation``.

Prereqs:
    - ``cstats_validation`` (``cstats_val``) to generate the cluster validation

Inputs: 
    - clusters/cluster_validation_results_`*` (glob pattern matching ``cstats_validation`` output dirs)
    - CSVs with validation metric data (e.g., cell_density_data.csv, label_density_data.csv, mean_in_cluster_data.csv, or mean_in_seg_in_cluster_data.csv from ``cstats_validation``)
    - Optional: path/vstats to copy p val, info, and index files

Outputs:
    - target_dir/<cluster_validation_results_* >/sample??__<metric>_data__<cluster_validation_results_* >.csv
    - or, with --by_subregion:
      target_dir/<cluster_validation_results_* >/sample??__<metric>_by_subregion_data__<cluster_validation_results_* >.csv

Notes:
    - If the cluster_validation_results_`*` directory name contains "_gt_" or "_lt_", the script will attempt to replace it with "_v_" to match the vstats directory.
    - This is useful when non-directional maps were made as directional.
    - If the cluster_validation_results_`*` directory name contains "_LH" or "_RH", the script will attempt to remove it to match the vstats directory.    

Usage
-----
    cstats_org_data -cvd '<asterisk>' -me <metric> [-vd path/vstats_dir] [-td target_dir] [-pvt p_value_threshold.txt] [-d list of paths] [-p sample??] [-v]
"""

import shutil
from pathlib import Path
from rich import print
from rich.traceback import install

from unravel.core.help_formatter import RichArgumentParser, SuppressMetavar, SM

from unravel.core.config import Configuration 
from unravel.core.utils import log_command, match_files, verbose_start_msg, verbose_end_msg, get_samples, print_func_name_args_times


[docs] def parse_args(): parser = RichArgumentParser(formatter_class=SuppressMetavar, add_help=False, docstring=__doc__) reqs = parser.add_argument_group('Required arguments') reqs.add_argument('-cvd', '--cluster_val_dirs', help='One or more glob patterns matching cluster validation output dirs to copy data from (relative to ./sample??/clusters/)', nargs='*', required=True, action=SM) opts = parser.add_argument_group('Optional args') opts.add_argument('-me', '--metric',help='Metric output from cstats_validation to aggregate (e.g., cell_density, label_density, mean_in_cluster, mean_in_seg_in_cluster)',required=True,action=SM) opts.add_argument('-vd', '--vstats_path', help='path/vstats_dir (the dir ``vstats`` was run from) to copy p val, info, and index files if provided', default=None, action=SM) opts.add_argument('-td', '--target_dir', help='path/dir to copy results. If omitted, copy data to the cwd', default=None, action=SM) opts.add_argument('-pvt', '--p_val_txt', help='Name of the file w/ the corrected p value thresh (e.g., from ``cstats_fdr``). Default: p_value_threshold.txt', default='p_value_threshold.txt', action=SM) opts.add_argument('-bsr', '--by_subregion', help='Copy <metric>_by_subregion_data.csv instead of <metric>_data.csv.', action='store_true', default=False) general = parser.add_argument_group('General arguments') general.add_argument('-d', '--dirs', help='Paths to sample?? dirs and/or dirs containing them (space-separated) for batch processing. Default: current dir', nargs='*', default=None, action=SM) general.add_argument('-p', '--pattern', help='Pattern for directories to process. Default: sample??', default='sample??', action=SM) general.add_argument('-v', '--verbose', help='Increase verbosity. Default: False', action='store_true', default=False) return parser.parse_args()
[docs] def resolve_cluster_correction_dir(validation_dir_name): name = str(validation_dir_name) # Normalize directional naming name = name.replace('_gt_', '_v_').replace('_lt_', '_v_') # Remove validation-output suffix name = name.replace('_rev_cluster_index', '') # Remove hemi suffix only for finding the parent stats dir hemi = None if name.endswith('_LH') or name.endswith('_RH'): hemi = name[-2:] name = name[:-3] return name, hemi
[docs] def find_matching_directory(base_path, long_name): base_path = Path(base_path) # Get all directories in base_path dirs = [d for d in base_path.iterdir() if d.is_dir()] # Find a directory whose name is a substring of long_name or vice versa for dir in dirs: if dir.name in long_name or long_name in dir.name: return dir.name return None
[docs] def cp(src, dest): """Copy a file from src path to a dest path, optionally printing the action. Args: - src (Path): the source path - dest (Path): the destination path""" if Path(src).exists(): Path(dest).parent.mkdir(parents=True, exist_ok=True) shutil.copy(src, dest)
[docs] def copy_stats_files(validation_dir, dest_path, vstats_path, p_val_txt): """Copy the cluster info, p value threshold, and rev_cluster_index files to the target directory. Args: - validation_dir (Path): the path to the validation directory - dest_path (Path): the path to the new directory - vstats_path (Path): the path to the vstats directory - p_val_txt (str): the name of the file with the corrected p value threshold""" vstats_path = Path(vstats_path) if vstats_path.exists(): validation_dir_name = str(validation_dir.name) cluster_correction_dir, hemi = resolve_cluster_correction_dir(validation_dir_name) # Construct the path and check existence cluster_correction_path = vstats_path / 'stats' / cluster_correction_dir if not cluster_correction_path.exists(): matched_dir = find_matching_directory(vstats_path / 'stats', cluster_correction_dir) if matched_dir is not None: cluster_correction_dir = matched_dir cluster_correction_path = vstats_path / 'stats' / cluster_correction_dir if not cluster_correction_path.exists(): print(f'\n [red]Path for rev_cluster_index.nii.gz, {p_val_txt}, and _cluster_info.txt does not exist: {cluster_correction_path}\n') import sys ; sys.exit() cluster_info = cluster_correction_path / f'{cluster_correction_dir}_cluster_info.txt' if cluster_info.exists(): dest_stats = dest_path / cluster_info.name if not dest_stats.exists(): cp(src=cluster_info, dest=dest_stats) else: print(f'\n [red]The cluster_info.txt ({cluster_info}) does not exist\n') p_val_thresh_file = cluster_correction_path / p_val_txt if p_val_thresh_file.exists(): dest_p_val_thresh = dest_path / p_val_txt if not dest_p_val_thresh.exists(): cp(src=p_val_thresh_file, dest=dest_p_val_thresh) else: print(f'\n [red]The p value threshold txt ({p_val_thresh_file}) does not exist\n') # Handle rev_cluster_index.nii.gz with optional hemisphere suffix if hemi: rev_cluster_index_path = cluster_correction_path / f"{cluster_correction_path.name}_rev_cluster_index_{hemi}.nii.gz" else: rev_cluster_index_path = cluster_correction_path / f"{cluster_correction_path.name}_rev_cluster_index.nii.gz" if rev_cluster_index_path.exists(): dest_rev_cluster_index = dest_path / rev_cluster_index_path.name if not dest_rev_cluster_index.exists(): cp(src=rev_cluster_index_path, dest=dest_rev_cluster_index) else: print(f'\n [red]The rev_cluster_index.nii.gz ({rev_cluster_index_path}) does not exist\n') import sys; sys.exit()
[docs] @print_func_name_args_times() def organize_validation_data(sample_path, clusters_path, validation_dir_pattern, metric, target_dir, vstats_path, p_val_txt, by_subregion=False): """Copy the cluster validation, p value, cluster info, and rev_cluster_index files to the target directory. Args: - sample_path (Path): the path to the sample directory - clusters_path (Path): the path to the clusters directory - validation_dir_pattern (str): the pattern to match the validation directories - metric (str): the type of metric data to aggregate (e.g., cell_density, label_density, mean_in_cluster, mean_in_seg_in_cluster) - target_dir (Path): the path to the target directory - vstats_path (Path): the path to the vstats directory - p_val_txt (str): the name of the file with the corrected p value threshold - by_subregion (bool): whether to copy <metric>_by_subregion_data.csv instead of <metric>_data.csv """ validation_dirs = [] for pat in validation_dir_pattern: validation_dirs.extend(match_files(pat, clusters_path)) # De-dupe while preserving order seen = set() validation_dirs = [d for d in validation_dirs if not (d in seen or seen.add(d))] for validation_dir in validation_dirs: if validation_dir.is_dir(): dest_path = target_dir / validation_dir.name dest_path.mkdir(parents=True, exist_ok=True) suffix = '_by_subregion_data.csv' if by_subregion else '_data.csv' src_csv = validation_dir / f'{metric}{suffix}' if src_csv.exists(): dest_csv = dest_path / f'{sample_path.name}__{metric}{suffix[:-4]}__{validation_dir.name}.csv' if not dest_csv.exists(): cp(src=src_csv, dest=dest_csv) else: print(f'\n [red]The expected csv ({src_csv}) does not exist\n') if vstats_path is not None: copy_stats_files(validation_dir=validation_dir, dest_path=dest_path, vstats_path=vstats_path, p_val_txt=p_val_txt)
[docs] @log_command def main(): install() args = parse_args() Configuration.verbose = args.verbose verbose_start_msg() target_dir = Path(args.target_dir).resolve() if args.target_dir else Path.cwd() target_dir.mkdir(exist_ok=True, parents=True) sample_paths = get_samples(args.dirs, args.pattern, args.verbose) for sample_path in sample_paths: clusters_path = sample_path / 'clusters' if clusters_path.exists(): organize_validation_data(sample_path=sample_path, clusters_path=clusters_path, validation_dir_pattern=args.cluster_val_dirs, metric=args.metric, target_dir=target_dir, vstats_path=args.vstats_path, p_val_txt=args.p_val_txt, by_subregion=args.by_subregion) verbose_end_msg()
if __name__ == '__main__': main()