Source code for ClearMap.Analysis.Statistics.GroupStatistics

# -*- coding: utf-8 -*-

Create some statistics to test significant changes in voxelized and labeled 
__author__    = 'Christoph Kirst <>'
__license__   = 'GPLv3 - GNU General Pulic License v3 (see LICENSE.txt)'
__copyright__ = 'Copyright © 2020 by Christoph Kirst'
__webpage__   = ''
__download__  = ''

import numpy as np
from scipy import stats

import ClearMap.IO.IO as io

import ClearMap.Alignment.Annotation as ano
import ClearMap.Analysis.Statistics.StatisticalTests as st

[docs]def t_test_voxelization(group1, group2, signed = False, remove_nan = True, p_cutoff = None): """t-Test on differences between the individual voxels in group1 and group2 Arguments --------- group1, group2 : array of arrays The group of voxelizations to compare. signed : bool If True, return also the direction of the changes as +1 or -1. remove_nan : bool Remove Nan values from the data. p_cutoff : None or float Optional cutoff for the p-values. Returns ------- p_values : array The p values for the group wise comparison. """ group1 = read_group(group1); group2 = read_group(group2); tvals, pvals = stats.ttest_ind(group2, group2, axis=0, equal_var=True); #remove nans if remove_nan: pi = np.isnan(pvals); pvals[pi] = 1.0; tvals[pi] = 0; pvals = cutoff_p_values(pvals, p_cutoff=p_cutoff); #return if signed: return pvals, np.sign(tvals); else: return pvals;
#TODO: group sources in IO
[docs]def read_group(sources, combine = True, **args): """Turn a list of sources for data into a numpy stack. Arguments --------- sources : list of str or sources The sources to combine. combine : bool If true combine the sources to ndarray, oterhwise return a list. Returns ------- group : array or list The gorup data. """ #check if stack already: if isinstance(sources, np.ndarray): return sources; #read the individual files group = []; for f in sources: data = io.as_source(f, **args).array; data = np.reshape(data, (1,) + data.shape); group.append(data); if combine: return np.vstack(group); else: return group;
[docs]def cutoff_p_values(pvals, p_cutoff = 0.05): """cutt of p-values above a threshold. Arguments --------- p_valiues : array The p values to truncate. p_cutoff : float or None The p-value cutoff. If None, do not cut off. Returns ------- p_values : array Cut off p-values. """ pvals2 = pvals.copy(); pvals2[pvals2 > p_cutoff] = p_cutoff; return pvals2;
[docs]def color_p_values(pvals, psign, positive = [1,0], negative = [0,1], p_cutoff = None, positive_trend = [0,0,1,0], negative_trend = [0,0,0,1], pmax = None): pvalsinv = pvals.copy(); if pmax is None: pmax = pvals.max(); pvalsinv = pmax - pvalsinv; if p_cutoff is None: # color given p values d = len(positive); ds = pvals.shape + (d,); pvc = np.zeros(ds); #color ids = psign > 0; pvalsi = pvalsinv[ids]; for i in range(d): pvc[ids, i] = pvalsi * positive[i]; ids = psign < 0; pvalsi = pvalsinv[ids]; for i in range(d): pvc[ids, i] = pvalsi * negative[i]; return pvc; else: # split pvalues according to cutoff d = len(positive_trend); if d != len(positive) or d != len(negative) or d != len(negative_trend) : raise RuntimeError('colorPValues: postive, negative, postivetrend and negativetrend option must be equal length!'); ds = pvals.shape + (d,); pvc = np.zeros(ds); idc = pvals < p_cutoff; ids = psign > 0; ##color # significant postive ii = np.logical_and(ids, idc); pvalsi = pvalsinv[ii]; w = positive; for i in range(d): pvc[ii, i] = pvalsi * w[i]; #non significant postive ii = np.logical_and(ids, np.negative(idc)); pvalsi = pvalsinv[ii]; w = positive_trend; for i in range(d): pvc[ii, i] = pvalsi * w[i]; # significant negative ii = np.logical_and(np.negative(ids), idc); pvalsi = pvalsinv[ii]; w = negative; for i in range(d): pvc[ii, i] = pvalsi * w[i]; #non significant postive ii = np.logical_and(np.negative(ids), np.negative(idc)) pvalsi = pvalsinv[ii]; w = negative_trend; for i in range(d): pvc[ii, i] = pvalsi * w[i]; return pvc;
[docs]def mean(group, **args): g = read_group(group, **args); return g.mean(axis = 0);
[docs]def std(group, **args): g = read_group(group, **args); return g.std(axis = 0);
[docs]def var(group, **args): g = read_group(group, **args); return g.var(axis = 0);
[docs]def weights_from_precentiles(intensities, percentiles = [25,50,75,100]): perc = np.percentiles(intensities, percentiles); weights = np.zeros(intensities.shape); for p in perc: ii = intensities > p; weights[ii] = weights[ii] + 1; return weights;
# needs clean up
[docs]def count_points_group_in_regions(point_group, annotation_file = ano.default_annotation_file, weight_group = None, invalid = 0, hierarchical = True): """Generates a table of counts for the various point datasets in pointGroup""" if intensity_group is None: counts = [ano.count_points(point_group[i], annotation_file=annotation_file, invalid=invalid, hierarchical=heirarchical) for i in range(len(point_group))]; else: counts = [ano.count_points(point_group[i], weight=weight_group[i], annotation_file=annotation_file, invalid=invalid, hierarchical=heirarchical) for i in range(len(point_group))]; counts = np.vstack(counts).T; return counts;
# needs clean up
[docs]def t_test_region_countss(counts1, counts2, annotation_file = ano.default_annotation_file, signed = False, remove_nan = True, p_cutoff = None, equal_var = False): """t-Test on differences in counts of points in labeled regions""" #ids, p1 = countPointsGroupInRegions(pointGroup1, labeledImage = labeledImage, withIds = True); #p2 = countPointsGroupInRegions(pointGroup2, labeledImage = labeledImage, withIds = False); tvals, pvals = st.ttest_ind(counts1, counts2, axis=1, equal_var=equal_var); #remove nans if remove_nan: pi = np.isnan(pvals); pvals[pi] = 1.0; tvals[pi] = 0; pvals = cutoff_p_values(pvals, p_cutoff = p_cutoff); #pvals.shape = (1,) + pvals.shape; #ids.shape = (1,) + ids.shape; #pvals = numpy.concatenate((ids.T, pvals.T), axis = 1); if signed: return pvals, np.sign(tvals); else: return pvals;
[docs]def test_completed_cumulatives(data, method = 'AndersonDarling', offset = None, plot = False): """Test if data sets have the same number / intensity distribution by adding max intensity counts to the smaller sized data sets and performing a distribution comparison test""" #idea: fill up data points to the same numbers at the high intensity values and use KS test #cf. work in progress on thoouroghly testing the differences in histograms #fill up the low count data n = np.array([x.size for x in data]); nm = n.max(); m = np.array([x.max() for x in data]); mm = m.max(); k = n.size; #print nm, mm, k if offset is None: #assume data starts at 0 ! offset = mm / nm; #ideall for all statistics this should be mm + eps to have as little influence as possible. datac = [x.copy() for x in data]; for i in range(m.size): if n[i] < nm: datac[i] = np.concatenate((datac[i], np.ones(nm-n[i], dtype = datac[i].dtype) * (mm + offset))); # + 10E-5 * numpy.random.rand(nm-n[i]))); #test by plotting if plot is True: import matplotlib.pyplot as plt; for i in range(m.size): datac[i].sort(); plt.step(datac[i], np.arange(datac[i].size)); #perfomr the tests if method == 'KolmogorovSmirnov' or method == 'KS': if k == 2: (s, p) = stats.ks_2samp(datac[0], datac[1]); else: raise RuntimeError('KolmogorovSmirnov only for 2 samples not %d' % k); elif method == 'CramervonMises' or method == 'CM': if k == 2: (s,p) = st.test_cramer_von_mises_2_sample(datac[0], datac[1]); else: raise RuntimeError('CramervonMises only for 2 samples not %d' % k); elif method == 'AndersonDarling' or method == 'AD': (s,a,p) = stats.anderson_ksamp(datac); return (p,s);
[docs]def test_completed_inverted_cumulatives(data, method = 'AndersonDarling', offset = None, plot = False): """Test if data sets have the same number / intensity distribution by adding zero intensity counts to the smaller sized data sets and performing a distribution comparison test on the reversed cumulative distribution""" #idea: fill up data points to the same numbers at the high intensity values and use KS test #cf. work in progress on thoouroghly testing the differences in histograms #fill up the low count data n = np.array([x.size for x in data]); nm = n.max(); m = np.array([x.max() for x in data]); mm = m.max(); k = n.size; #print nm, mm, k if offset is None: #assume data starts at 0 ! offset = mm / nm; #ideall for all statistics this should be mm + eps to have as little influence as possible. datac = [x.copy() for x in data]; for i in range(m.size): if n[i] < nm: datac[i] = np.concatenate((-datac[i], np.ones(nm-n[i], dtype = datac[i].dtype) * (offset))); # + 10E-5 * numpy.random.rand(nm-n[i]))); else: datac[i] = -datac[i]; #test by plotting if plot is True: import matplotlib.pyplot as plt; for i in range(m.size): datac[i].sort(); plt.step(datac[i], np.arange(datac[i].size)); #perfomr the tests if method == 'KolmogorovSmirnov' or method == 'KS': if k == 2: (s, p) = stats.ks_2samp(datac[0], datac[1]); else: raise RuntimeError('KolmogorovSmirnov only for 2 samples not %d' % k); elif method == 'CramervonMises' or method == 'CM': if k == 2: (s,p) = st.test_cramer_von_mises_2_sample(datac[0], datac[1]); else: raise RuntimeError('CramervonMises only for 2 samples not %d' % k); elif method == 'AndersonDarling' or method == 'AD': (s,a,p) = stats.anderson_ksamp(datac); return (p,s);
[docs]def test_completed_cumulatives_in_spheres(points1, intensities1, points2, intensities2, shape = ano.default_annotation_file, radius = 100, method = 'AndresonDarling'): """Performs completed cumulative distribution tests for each pixel using points in a ball centered at that cooridnates, returns 4 arrays p value, statistic value, number in each group""" #TODO: sinple implementation -> slow -> speed up if not isinstance(shape, tuple): shape = io.shape(shape); if len(shape) != 3: raise RuntimeError('Shape expected to be 3d, found %r' % (shape,)); # distances^2 to origin x1= points1[:,0]; y1 = points1[:,1]; z1 = points1[:,2]; i1 = intensities1; d1 = x1 * x1 + y1 * y1 + z1 * z1; x2 = points2[:,0]; y2 = points2[:,1]; z2 = points2[:,2]; i2 = intensities2; d2 = x2 * x2 + y2 * y2 + z2 * z2; r2 = radius * radius; # TODO: inhomogenous in 3d ! p = np.zeros(dataSize); s = np.zeros(dataSize); n1 = np.zeros(dataSize, dtype = 'int'); n2 = np.zeros(dataSize, dtype = 'int'); for x in range(dataSize[0]): #print x for y in range(dataSize[1]): #print y for z in range(dataSize[2]): #print z d11 = d1 - 2 * (x * x1 + y * y1 + z * z1) + (x*x + y*y + z*z); d22 = d2 - 2 * (x * x2 + y * y2 + z * z2) + (x*x + y*y + z*z); ii1 = d11 < r2; ii2 = d22 < r2; n1[x,y,z] = ii1.sum(); n2[x,y,z] = ii2.sum(); if n1[x,y,z] > 0 and n2[x,y,z] > 0: (pp, ss) = self.testCompletedCumulatives((i1[ii1], i2[ii2]), method = method); else: pp = 0; ss = 0; p[x,y,z] = pp; s[x,y,z] = ss; return (p,s,n1,n2);
############################################################################### ### Tests ############################################################################### def _test(): """Test the statistics array""" import numpy as np import ClearMap.Analysis.Statistics.GroupStatistics as st s = np.ones((5,4,20)); s[:, 0:3, :] = - 1; x = np.random.rand(4,4,20); y = np.random.rand(5,4,20) + s; pvals, psign = st.t_test_voxelization(x,y, signed = True); pvalscol = st.color_p_values(pvals, psign, positive = [255,0,0], negative = [0,255,0]) import ClearMap.Visualization.Plot3d as p3d p3d.plot(pvalscol)