#!/usr/bin/env python3
Use ``rstats_summary`` (``rss``) from UNRAVEL to plot cell densensities for each region and summarize results.
- CSVs with cell densities for each region (e.g., regional_stats/<condition>_sample??_cell_densities.csv)
- Input CSV columns: Region_ID, Side, ID_Path, Region, Abbr, <OneWordCondition>_sample??
- The <OneWordCondition>_sample?? column has the cell densities for each region.
- Saved to ./<test_type>_plots_<side>
- Plots for each region with cell densities for each group (e.g., Saline, MDMA, Meth)
- Summary of significant differences between groups
- regional_cell_densities_all.csv (Columns: columns: Region_ID,Side,Name,Abbr,Saline_sample06,Saline_sample07,...,MDMA_sample01,...,Meth_sample23,...)
- Example hex code list (flank arg w/ double quotes): ['#2D67C8', '#27AF2E', '#D32525', '#7F25D3']
- Default csv: UNRAVEL/unravel/core/csvs/CCFv3-2020_regional_summary.csv
- It has columns: Region_ID, ID_Path, Region, Abbr, General_Region, R, G, B
- Alternatively, use CCFv3-2017_regional_summary.csv or provide a custom CSV with the same columns.
Usage for Tukey tests:
rstats_summary --groups Saline MDMA Meth -hemi both [-div 10000] [-y cell_density] [-csv CCFv3-2020_regional_summary.csv] [-b ABA] [-s light:white] [-o tukey_plots] [-e pdf] [-v]
Usage for t-tests:
rstats_summary --groups Saline MDMA -hemi both -c Saline [-alt two-sided] [-div 10000] [-y cell_density] [-csv CCFv3-2020_regional_summary.csv] [-b ABA] [-s light:white] [-o t-test_plots] [-e pdf] [-v]
import ast
import os
from pathlib import Path
import re
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import textwrap
from rich import print
from rich.live import Live
from rich.traceback import install
from scipy.stats import ttest_ind
from statsmodels.stats.multicomp import pairwise_tukeyhsd
from unravel.core.help_formatter import RichArgumentParser, SuppressMetavar, SM
from unravel.core.config import Configuration
from unravel.core.utils import log_command, verbose_start_msg, verbose_end_msg, initialize_progress_bar
def parse_args():
parser = RichArgumentParser(formatter_class=SuppressMetavar, add_help=False, docstring=__doc__)
reqs = parser.add_argument_group('Required arguments')
reqs.add_argument('--groups', nargs='*', help='Group prefixes (e.g., saline meth mdma)', required=True, action=SM)
reqs.add_argument('-hemi', help="Hemisphere(s) to process (r, l or both)", choices=['r', 'l', 'both'], required=True, action=SM)
opts = parser.add_argument_group('Optional arguments')
opts.add_argument('-c', '--ctrl_group', help="Control group name for t-test or Dunnett's tests", action=SM) # Does the control need to be specified for a t-test? First group could be the control.
opts.add_argument('-alt', "--alternate", help="Number of tails and direction for t-tests or Dunnett's tests ('two-sided' \[default], 'less' [group1 < group2], or 'greater')", default='two-sided', action=SM)
opts.add_argument('-d', '--divide', type=float, help='Divide the cell densities by the specified value for plotting (default is None)', default=None, action=SM)
opts.add_argument('-y', '--ylabel', help='Y-axis label (Default: cell_density)', default='cell_density', action=SM)
opts.add_argument('-csv', '--csv_path', help='CSV name or path/name.csv. Default: CCFv3-2020_regional_summary.csv', default='CCFv3-2020_regional_summary.csv', action=SM)
opts.add_argument('-b', '--bar_color', help="ABA (default), #hex_code, Seaborn palette, or #hex_code list matching # of groups", default='ABA', action=SM)
opts.add_argument('-s', '--symbol_color', help="ABA, #hex_code, Seaborn palette (Default: light:white), or #hex_code list matching # of groups", default='light:white', action=SM)
opts.add_argument('-o', '--output', help='Output directory for plots (Default: <t-test or tukey>_plots)', action=SM)
opts.add_argument('-e', "--extension", help="File extension for plots. Choices: pdf (default), svg, eps, tiff, png)", default='pdf', choices=['pdf', 'svg', 'eps', 'tiff', 'png'], action=SM)
general = parser.add_argument_group('General arguments')
general.add_argument('-v', '--verbose', help='Increase verbosity. Default: False', action='store_true', default=False)
return parser.parse_args()
# TODO: Dunnett's test. LH/RH averaging via summing counts and volumes before dividing counts by volumes (rather than averaging densities directly). Set up label density quantification.
# TODO: Adapt this to work for cell counts and label densities. This could also be used for mean IF intensities.
# TODO: Need a way to handle cases when some data from some samples is from one hemisphere and some from the other. (see filter_csv.py)
# TODO: Fix plots for when there are > 3 groups (comparison lines are not positioned correctly)
def get_region_details(region_id, df):
# Adjust to account for the unique region IDs.
region_row = df[(df["Region_ID"] == region_id) | (df["Region_ID"] == region_id + 20000)].iloc[0]
return region_row["Region"], region_row["Abbr"]
def parse_color_argument(color_arg, num_groups, region_id, csv_path):
if isinstance(color_arg, str):
if color_arg.startswith('[') and color_arg.endswith(']'):
# It's a string representation of a list, so evaluate it safely
color_list = ast.literal_eval(color_arg)
if len(color_list) != num_groups:
raise ValueError(f"The number of colors provided ({len(color_list)}) does not match the number of groups ({num_groups}).")
return color_list
elif color_arg.startswith('#'):
# It's a single hex color, use it for all groups
return [color_arg] * num_groups
elif color_arg == 'ABA':
# Determine the RGB color for bars based on the region_id
combined_region_id = region_id if region_id < 20000 else region_id - 20000
if csv_path == 'CCFv3-2017_regional_summary.csv' or csv_path == 'CCFv3-2020_regional_summary.csv':
results_df = pd.read_csv(Path(__file__).parent.parent / 'core' / 'csvs' / csv_path) #(Region_ID,ID_Path,Region,Abbr,General_Region,R,G,B)
results_df = pd.read_csv(csv_path)
region_rgb = results_df[results_df['Region_ID'] == combined_region_id][['R', 'G', 'B']]
rgb = tuple(region_rgb.iloc[0].values)
rgb_normalized = tuple([x / 255.0 for x in rgb])
ABA_color = sns.color_palette([rgb_normalized] * num_groups)
return ABA_color
# It's a named seaborn palette
return sns.color_palette(color_arg, num_groups)
# It's already a list (this would be the case for default values or if the input method changes)
return color_arg
def summarize_significance(test_df, id):
"""Summarize the results of the statistical tests.
- test_df (DataFrame): the DataFrame containing the test results (w/ columns: group1, group2, p-value, meandiff)
- id (int): the region or cluster ID
- summary_df (DataFrame): the DataFrame containing the summarized results
summary_rows = []
for _, row in test_df.iterrows():
group1, group2 = row['group1'], row['group2']
# Determine significance level
sig = ''
if row['p-value'] < 0.0001:
sig = '****'
elif row['p-value'] < 0.001:
sig = '***'
elif row['p-value'] < 0.01:
sig = '**'
elif row['p-value'] < 0.05:
sig = '*'
# Determine which group has a higher mean
meandiff = row['meandiff']
higher_group = group2 if meandiff > 0 else group1
'Region_ID': id,
'Comparison': f'{group1} vs {group2}',
'p-value': row['p-value'],
'Higher_Mean_Group': higher_group,
'Significance': sig
return pd.DataFrame(summary_rows)
def process_and_plot_data(df, region_id, region_name, region_abbr, side, out_dir, group_columns, test_type, args):
# Reshaping the data for plotting
reshaped_data = []
for prefix in args.groups:
for value in df[group_columns[prefix]].values.ravel():
reshaped_data.append({'group': prefix, 'density': value})
reshaped_df = pd.DataFrame(reshaped_data)
# Plotting
mpl.rcParams['font.family'] = 'Arial'
plt.figure(figsize=(4, 4))
groups = reshaped_df['group'].unique()
num_groups = len(groups)
# Parse the color arguments
bar_color = parse_color_argument(args.bar_color, num_groups, region_id, args.csv_path)
symbol_color = parse_color_argument(args.symbol_color, num_groups, region_id, args.csv_path)
# Coloring the bars and symbols
# ax = sns.barplot(x='group', y='density', data=reshaped_df, errorbar=('se'), capsize=0.1, palette=bar_color, linewidth=2, edgecolor='black')
ax = sns.barplot(x='group', y='density', hue='group', data=reshaped_df, errorbar=('se'), capsize=0.1, palette=bar_color, linewidth=2, edgecolor='black', legend=False)
sns.stripplot(x='group', y='density', hue='group', data=reshaped_df, palette=symbol_color, alpha=0.5, size=8, linewidth=0.75, edgecolor='black')
# Calculate y_max and y_min based on the actual plot
y_max = ax.get_ylim()[1]
y_min = ax.get_ylim()[0]
height_diff = (y_max - y_min) * 0.05 # Adjust the height difference as needed
y_pos = y_max * 1.05 # Start just above the highest bar
# Check which test to perform
if test_type == 't-test':
# Perform t-test for each group against the control group
control_data = df[group_columns[args.ctrl_group]].values.ravel()
test_results = []
for prefix in args.groups:
if prefix != args.ctrl_group:
other_group_data = df[group_columns[prefix]].values.ravel()
t_stat, p_value = ttest_ind(other_group_data, control_data, equal_var=True, alternative=args.alternate) # Switched to equal_var=True and alternative=args.alternate
meandiff = np.mean(other_group_data) - np.mean(control_data)
# if args.alternate == 'less' and meandiff < 0:
# p_value /= 2 # For one-tailed test, halve the p-value if the alternative is 'less'
# t_stat = -t_stat # Flip the sign for 'less'
# elif args.alternate == 'greater' and meandiff > 0:
# p_value /= 2 # For one-tailed test, halve the p-value if the alternative is 'greater'
# elif args.alternate == 'two-sided':
# pass # No change in p value needed for two-sided test
# else: # Effect direction not consistent with hypothesis
# p_value = 1
'group1': args.ctrl_group,
'group2': prefix,
't-stat': t_stat,
'p-value': p_value,
'meandiff': np.mean(other_group_data) - np.mean(control_data)
test_results_df = pd.DataFrame(test_results)
significant_comparisons = test_results_df[test_results_df['p-value'] < 0.05]
# elif test_type == 'dunnett':
# # Extract the data for the control group and the other groups
# data = [df[group_columns[prefix]].values.ravel() for prefix in args.groups if prefix != args.ctrl_group]
# control_data = df[group_columns[args.ctrl_group]].values.ravel()
# # The * operator unpacks the list so that each array is a separate argument, as required by dunnett
# dunnett_results = dunnett(*data, control=control_data, alternative=args.alternate)
# group2_data = [df[group_columns[prefix]].values.ravel() for prefix in args.groups if prefix != args.ctrl_group]
# # Convert the result to a DataFrame
# test_results_df = pd.DataFrame({
# 'group1': [args.ctrl_group] * len(dunnett_results.pvalue),
# 'group2': [prefix for prefix in args.groups if prefix != args.ctrl_group],
# 'p-value': dunnett_results.pvalue,
# 'meandiff': np.mean(group2_data, axis=1) - np.mean(control_data) # Calculate the mean difference between each group and the control group
# })
# significant_comparisons = test_results_df[test_results_df['p-value'] < 0.05]
elif test_type == 'tukey':
# Conduct Tukey's HSD test
densities = np.array([value for prefix in args.groups for value in df[group_columns[prefix]].values.ravel()]) # Flatten the data
groups = np.array([prefix for prefix in args.groups for _ in range(len(df[group_columns[prefix]].values.ravel()))])
tukey_results = pairwise_tukeyhsd(densities, groups, alpha=0.05)
# Extract significant comparisons from Tukey's results
test_results_df = pd.DataFrame(data=tukey_results.summary().data[1:], columns=tukey_results.summary().data[0])
test_results_df.rename(columns={'p-adj': 'p-value'}, inplace=True)
significant_comparisons = test_results_df[test_results_df['p-value'] < 0.05]
# Loop for plotting comparison bars and asterisks
for _, row in significant_comparisons.iterrows():
group1, group2 = row['group1'], row['group2']
x1 = np.where(groups == group1)[0][0]
x2 = np.where(groups == group2)[0][0]
# Plotting comparison lines
plt.plot([x1, x1, x2, x2], [y_pos, y_pos + height_diff, y_pos + height_diff, y_pos], lw=1.5, c='black')
# Plotting asterisks based on p-value
if row['p-value'] < 0.0001:
sig = '****'
elif row['p-value'] < 0.001:
sig = '***'
elif row['p-value'] < 0.01:
sig = '**'
sig = '*'
plt.text((x1 + x2) * .5, y_pos + 1 * height_diff, sig, horizontalalignment='center', size='xx-large', color='black', weight='bold')
y_pos += 3 * height_diff # Increment y_pos for the next comparison bar
# Remove the legend only if it exists
if ax.get_legend():
# Format the plot
if args.ylabel == 'cell_density':
ax.set_ylabel(r'Cells*10$^{4} $/mm$^{3}$', weight='bold')
ax.set_ylabel(args.ylabel, weight='bold')
ax.set_xticks(range(len(ax.get_xticklabels()))) # Set ticks based on current tick labels
ax.set_xticklabels(ax.get_xticklabels(), weight='bold')
ax.tick_params(axis='both', which='major', width=2)
plt.ylim(0, y_pos) # Adjust y-axis limit to accommodate comparison bars
ax.set_xlabel('group') ### was None
# Check if there are any significant comparisons (for prepending '_sig__' to the filename)
has_significant_results = True if significant_comparisons.shape[0] > 0 else False
# Extract the general region for the filename (output file name prefix for sorting by region)
if args.csv_path == 'CCFv3-2017_regional_summary.csv' or args.csv_path == 'CCFv3-2020_regional_summary.csv':
regional_summary = pd.read_csv(Path(__file__).parent.parent / 'core' / 'csvs' / args.csv_path) #(Region_ID,ID_Path,Region,Abbr,General_Region,R,G,B)
regional_summary = pd.read_csv(args.csv_path)
region_id = region_id if region_id < 20000 else region_id - 20000 # Adjust if left hemi
general_region = regional_summary.loc[regional_summary['Region_ID'] == region_id, 'General_Region'].values[0]
# Format the filename with '_sig__' prefix if there are significant results
prefix = '_sig__' if has_significant_results else ''
filename = f"{prefix}{general_region}__{region_id}_{region_abbr}_{side}".replace("/", "-") # Replace problematic characters
# Save the plot for each side or pooled data
title = f"{region_name} ({region_abbr}, {side})"
wrapped_title = textwrap.fill(title, 42)
plt.title(wrapped_title, pad = 20).set_position([.5, 1.05])
return test_results_df
def main():
args = parse_args()
Configuration.verbose = args.verbose
if len(args.groups) == 2:
test_type = 't-test'
elif len(args.groups) > 2:
test_type = 'tukey'
# Find all CSV files in the current directory matching *cell_densities.csv
file_list = [file for file in os.listdir('.') if file.endswith('cell_densities.csv')]
print(f"\nAggregating data from *cell_densities.csv: {file_list}\n")
# Check if files are found
if not file_list:
print(" [red1]No files found matching the pattern '*cell_densities.csv'.")
# Aggregate the data for each sample
aggregated_df = pd.read_csv(file_list[0]).iloc[:, 0:5]
for file_name in file_list:
df = pd.read_csv(file_name).iloc[:, -1:]
# Rename the column prefix to match the --groups argument
for prefix in args.groups:
if prefix.lower() in df.columns[0].lower():
old_prefix = df.columns[0].split("_")[0]
new_column_name = df.columns[0].replace(old_prefix, prefix)
df.rename(columns={df.columns[0]: new_column_name}, inplace=True)
# Append the aggregated data to the dataframe
aggregated_df = pd.concat([aggregated_df, df], axis=1)
# Sort all columns that are not part of the first five by group prefix
group_columns = sorted(aggregated_df.columns[5:], key=lambda x: args.groups.index(x.split('_')[0]))
# Sort each group's columns numerically and combine them
sorted_group_columns = []
for prefix in args.groups:
prefixed_group_columns = [col for col in group_columns if col.startswith(f"{prefix}_")]
sorted_group_columns += sorted(prefixed_group_columns, key=lambda x: int(re.search(r'\d+', x).group()))
# Combine the first five columns with the sorted group columns
sorted_columns = aggregated_df.columns[:5].tolist() + sorted_group_columns
# Now sorted_columns contains all columns, sorted by group and numerically within each group
df = aggregated_df[sorted_columns]
# Save the aggregated data as a CSV
df.to_csv('regional_cell_densities_all.csv', index=False)
# Normalization if needed
if args.divide:
df.iloc[:, 5:] = df.iloc[:, 5:].div(args.divide)
# Prepare output directories
if args.alternate == 'two-sided':
suffix = ''
suffix = f"_{args.alternate}" # Add suffix to indicate the alternative hypothesis
# Make output directories
if args.output:
if args.hemi == 'both':
out_dirs = {side: f"{args.output}_{side}{suffix}" for side in ["L", "R", "pooled"]}
elif args.hemi == 'r':
out_dirs = {side: f"{args.output}_{side}{suffix}" for side in ["R"]}
elif args.hemi == 'l':
out_dirs = {side: f"{args.output}_{side}{suffix}" for side in ["L"]}
print("--hemi should be l, r, or both")
import sys ; sys.exit()
if args.hemi == 'both':
out_dirs = {side: f"{test_type}_plots_{side}{suffix}" for side in ["L", "R", "pooled"]}
elif args.hemi == 'r':
out_dirs = {side: f"{test_type}_plots_{side}{suffix}" for side in ["R"]}
elif args.hemi == 'l':
out_dirs = {side: f"{test_type}_plots_{side}{suffix}" for side in ["L"]}
print("--hemi should be l, r, or both")
import sys ; sys.exit()
for out_dir in out_dirs.values():
os.makedirs(out_dir, exist_ok=True)
group_columns = {}
for prefix in args.groups:
group_columns[prefix] = [col for col in df.columns if col.startswith(f"{prefix}_")]
if args.hemi == 'both':
# Averaging data across hemispheres and plotting pooled data (DR)
print(f"\nPlotting and summarizing pooled data for each region...\n")
rh_df = df[df['Region_ID'] < 20000]
lh_df = df[df['Region_ID'] > 20000]
# Initialize an empty dataframe to store all summaries
all_summaries_pooled = pd.DataFrame()
# Drop first 4 columns
rh_df = rh_df.iloc[:, 5:]
lh_df = lh_df.iloc[:, 5:]
# Reset indices to ensure alignment
rh_df.reset_index(drop=True, inplace=True)
lh_df.reset_index(drop=True, inplace=True)
# Initialize pooled_df with common columns
pooled_df = df[['Region_ID', 'Side', 'ID_Path', 'Region', 'Abbr']][df['Region_ID'] < 20000].reset_index(drop=True)
pooled_df['Side'] = 'Pooled' # Set the 'Side' to 'Pooled'
# Average the cell densities for left and right hemispheres
for col in lh_df.columns:
pooled_df[col] = (lh_df[col] + rh_df[col]) / 2
# Averaging data across hemispheres and plotting pooled data
unique_region_ids = df[df["Side"] == "R"]["Region_ID"].unique()
progress, task_id = initialize_progress_bar(len(unique_region_ids), "[red]Processing regions (pooled)...")
with Live(progress):
for region_id in unique_region_ids:
region_name, region_abbr = get_region_details(region_id, df)
out_dir = out_dirs["pooled"]
comparisons_summary = process_and_plot_data(pooled_df[pooled_df["Region_ID"] == region_id], region_id, region_name, region_abbr, "Pooled", out_dir, group_columns, test_type, args)
summary_df = summarize_significance(comparisons_summary, region_id)
all_summaries_pooled = pd.concat([all_summaries_pooled, summary_df], ignore_index=True)
progress.update(task_id, advance=1)
# Merge with the original CCFv3-2020_regional_summary.csv and write to a new CSV
if args.csv_path == 'CCFv3-2017_regional_summary.csv' or args.csv_path == 'CCFv3-2020_regional_summary.csv':
regional_summary = pd.read_csv(Path(__file__).parent.parent / 'core' / 'csvs' / args.csv_path) #(Region_ID,ID_Path,Region,Abbr,General_Region,R,G,B)
regional_summary = pd.read_csv(args.csv_path)
final_summary_pooled = pd.merge(regional_summary, all_summaries_pooled, on='Region_ID', how='left')
final_summary_pooled.to_csv(Path(out_dir) / '__significance_summary_pooled.csv', index=False)
# Perform analysis and plotting for each hemisphere
if args.hemi == 'r':
sides_to_process = ["R"]
elif args.hemi == 'l':
sides_to_process = ["L"]
sides_to_process = ["L", "R"]
for side in sides_to_process:
print(f"\nPlotting and summarizing data for {side} hemisphere...\n")
# Initialize an empty dataframe to store all summaries
all_summaries = pd.DataFrame()
side_df = df[df['Side'] == side]
unique_region_ids = side_df["Region_ID"].unique() # Get unique region IDs for the current side
progress, task_id = initialize_progress_bar(len(unique_region_ids), f"[red]Processing regions ({side})...")
with Live(progress):
for region_id in unique_region_ids:
region_name, region_abbr = get_region_details(region_id, side_df)
out_dir = out_dirs[side]
comparisons_summary = process_and_plot_data(side_df[side_df["Region_ID"] == region_id], region_id, region_name, region_abbr, side, out_dir, group_columns, test_type, args)
summary_df = summarize_significance(comparisons_summary, region_id)
all_summaries = pd.concat([all_summaries, summary_df], ignore_index=True)
progress.update(task_id, advance=1)
# Merge with the original CCFv3-2020_regional_summary.csv and write to a new CSV
if args.csv_path == 'CCFv3-2017_regional_summary.csv' or args.csv_path == 'CCFv3-2020_regional_summary.csv':
regional_summary = pd.read_csv(Path(__file__).parent.parent / 'core' / 'csvs' / args.csv_path) #(Region_ID,ID_Path,Region,Abbr,General_Region,R,G,B)
regional_summary = pd.read_csv(args.csv_path)
# Adjust Region_ID for left hemisphere
if side == "L":
all_summaries["Region_ID"] = all_summaries["Region_ID"] - 20000
final_summary = pd.merge(regional_summary, all_summaries, on='Region_ID', how='left')
final_summary.to_csv(Path(out_dir) / f'__significance_summary_{side}.csv', index=False)
if __name__ == '__main__':