Source code for ClearMap.Analysis.Graphs.GraphRendering

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
GraphVisualization
==================

Module providing tools to create meshes and visualize graphs.

"""
__author__    = 'Christoph Kirst <christoph.kirst.ck@gmail.com>'
__license__   = 'GPLv3 - GNU General Pulic License v3 (see LICENSE)'
__copyright__ = 'Copyright © 2020 by Christoph Kirst'
__webpage__   = 'http://idisco.info'
__download__  = 'http://www.github.com/ChristophKirst/ClearMap2'


import numpy as np

import functools as ft

import vispy.util.transforms as trf

import ClearMap.ParallelProcessing.SharedMemoryManager as smm
import ClearMap.ParallelProcessing.ParallelTraceback as ptb



###############################################################################
### Mesh generation
###############################################################################

[docs]def mesh(graph = None, method = 'tubes', **kwargs): if method == 'tubes': return mesh_tube(graph, **kwargs); else: ValueError('Method n%r not valid!' % method);
############################################################################### ### Graph mesh using tubes ###############################################################################
[docs]def mesh_tube(graph = None, coordinates = None, radii = None, indices = None, color = None, vertex_colors = None, edge_colors = None, n_tube_points = None, default_radius = 1, default_color = (0.8, 0.1, 0.1, 1.0), processes = None, verbose = False): """Construct mesh from edge geometry of a graph.""" if graph is not None: if graph.has_edge_geometry(): if coordinates is None: name = 'coordinates'; if isinstance(coordinates, str): coordinates, indices = graph.edge_geometry(name=name, return_indices=True, as_list=False); else: raise ValueError('Expected coordinates to by None or str, found %r!' % coordinates) try: if radii is None: name = 'radii'; else: name = radii; radii = graph.edge_geometry(name=name, return_indices=False, as_list=False); except: print('No radii found in the graph, using uniform radii = %r!' % default_radius); radii = default_radius * np.ones(coordinates.shape[0]); else: coordinates = graph.vertex_coordinates(); indices = graph.edge_connectivity().flatten(); coordinates = np.vstack(coordinates[indices]); try: radii = graph.vertex_radii(); radii = radii[indices]; except: print('No radii found in the graph, using uniform radii = %r!' % default_radius); radii = default_radius * np.ones(coordinates.shape[0]); n_edges = graph.n_edges; indices = np.array([2*np.arange(0,n_edges), 2*np.arange(1,n_edges+1)]).T; if vertex_colors is not None or edge_colors is not None: color = None; if color is None and vertex_colors is None: color = default_color; if vertex_colors is not None: connectivity = graph.edge_connectivity(); edge_colors = (vertex_colors[connectivity[:,0]] + vertex_colors[connectivity[:,1]])/2.0; vertices, faces, colors = mesh_tube_from_coordinates_and_radii(coordinates, radii, indices, n_tube_points=n_tube_points, edge_colors=edge_colors, processes=None); return vertices, faces, colors
[docs]def mesh_tube_from_coordinates_and_radii(coordinates, radii, indices, n_tube_points = None, edge_colors = None, processes = None, verbose = False): """Construct a mesh from the edge geometry of a graph.""" coordinates_hdl = smm.insert(coordinates); radii_hdl = smm.insert(radii); indices_hdl = smm.insert(indices); if n_tube_points is None: n_tube_points = 8; func = ft.partial(_parallel_mesh, coordinates_hdl=coordinates_hdl, radii_hdl=radii_hdl, indices_hdl=indices_hdl, n_tube_points=n_tube_points, verbose=verbose); argdata = np.arange(len(indices)); # process in parallel pool = smm.mp.Pool(processes = processes); results = pool.map(func, argdata); pool.close(); pool.join(); smm.free(coordinates_hdl); smm.free(radii_hdl); smm.free(indices_hdl); n_results = len(results); vertices = [np.reshape(r[0],(-1,3)) for r in results]; n_indices = [len(v) for v in vertices]; n_indices = np.cumsum(n_indices); n_indices = np.hstack([[0], n_indices]); if edge_colors is not None: colors = [len(v) * [c] for v,c in zip(vertices, edge_colors)]; colors = np.concatenate(colors); #print(colors.shape) else: colors = None; vertices = np.concatenate(vertices); #print(grid.shape) faces = np.concatenate([results[i][1] + n_indices[i] for i in range(n_results)]); return vertices, faces, colors
def _mesh(coordinates, radii, n_tube_points = 15, dtype = 'uint32'): n_coordinates = len(coordinates); tangents, normals, binormals = _frenet_frames(coordinates) #circular tube v = np.arange(n_tube_points, dtype = float) / n_tube_points * 2 * np.pi; c = np.cos(v); s = np.sin(v); r = radii[:, np.newaxis]; n = normals * r; b = binormals * r; #grid shape (npoints, ntube, 3) grid = coordinates[:, np.newaxis, :] + c[np.newaxis, :, np.newaxis] * n[:, np.newaxis, :] + s[np.newaxis, :, np.newaxis] * b[:, np.newaxis, :]; # construct the mesh n_segments = n_coordinates - 1; jp = np.ones(n_segments*n_tube_points, dtype = int); jp[n_tube_points * np.arange(n_segments, dtype = int) - 1] -= n_tube_points; i1 = np.arange(n_segments*n_tube_points, dtype = int); i2 = i1 + n_tube_points; i3 = i2 + jp; i4 = i1 + jp; indices = np.array(np.vstack([np.array([i1,i2,i4]).T, np.array([i2, i3, i4]).T]), dtype=dtype); return grid, indices def _frenet_frames(coordinates): """Calculates and returns the tangents, normals and binormals for a chain of coordinates.""" npoints = len(coordinates); epsilon = 0.0001 # compute tangent vectors for each segment tangents = np.roll(coordinates, -1, axis=0) - np.roll(coordinates, 1, axis=0) tangents[0] = coordinates[1] - coordinates[0] tangents[-1] = coordinates[-1] - coordinates[-2] tangents = (tangents.T / np.linalg.norm(tangents, axis = 1)).T; # get initial normal and binormal t = np.abs(tangents[0]) smallest = np.argmin(t) normal = np.zeros(3) normal[smallest] = 1. vec = np.cross(tangents[0], normal) normals = np.zeros((npoints, 3)); normals[0] = np.cross(tangents[0], vec) # compute changea along trajectory theta = np.arccos(np.clip(np.sum(tangents[:-1] * tangents[1:], axis = 1),-1,1)); vec = np.cross(tangents[:-1], tangents[1:]); nrm = np.linalg.norm(vec, axis = 1); # compute normal and binormal vectors along the path for i in range(npoints-1): normals[i+1] = normals[i] if nrm[i] > epsilon: v = vec[i] / nrm[i]; normals[i+1] = trf.rotate(-np.degrees(theta[i]), v)[:3, :3].dot(normals[i+1]) binormals = np.cross(tangents, normals) return tangents, normals, binormals @ptb.parallel_traceback def _parallel_mesh(i, coordinates_hdl, radii_hdl, indices_hdl, n_tube_points = 15, verbose = False): coordinates = smm.get(coordinates_hdl); radii = smm.get(radii_hdl); start,end = smm.get(indices_hdl)[i]; coordinates = coordinates[start:end]; radii = radii[start:end]; if verbose: if i % 1000 == 0: print('Mesh calculation %d / %d.' % (i, len(smm.get(indices_hdl)))); return _mesh(coordinates, radii, n_tube_points) ############################################################################### ### Render graph using intrepolation ###############################################################################
[docs]def interpolate_edge_geometry(graph, smooth = 5, order = 2, points_per_pixel = 0.5, processes = None, verbose = False): """Smooth center lines and radii of the edge geometry.""" if not graph.has_edge_geometry(): raise ValueError('Graph has no edge geometry!') coordinates, indices = graph.edge_geometry('coordinates', return_indices=True, as_list=False); radii = graph.edge_geometry('radii', as_list=False); # prepare result arrays #indices_interp = np.array([_n_points_per_edge(i[1]-i[0], points_per_pixel=points_per_pixel) for i in indices]); #indices_interp = np.cumsum(indices_interp); #indices_interp = np.hstack([[0], indices_interp]) #indices_interp = np.array([indices_interp[:-1], indices_interp[1:]]).T; #coordinates_interp = np.zeros((indices_interp[-1,1], coordinates.shape[1]), dtype=float); #radii_interp = np.zeros(indices_interp[-1,1], dtype=float); # process in parallel coordinates_hdl = smm.insert(coordinates); radii_hdl = smm.insert(radii); indices_hdl = smm.insert(indices); # coordinates_interp_hdl = smm.insert(coordinates_interp); # radii_interp_hdl = smm.insert(radii_interp); # indices_interp_hdl = smm.insert(indices_interp); func = ft.partial(_parallel_interpolate, coordinates_hdl=coordinates_hdl, radii_hdl=radii_hdl, indices_hdl=indices_hdl, # coordinates_interp_hdl=coordinates_interp_hdl, radii_interp_hdl=radii_interp_hdl, indices_interp_hdl=indices_interp_hdl, smooth=smooth, order=order, points_per_pixel=points_per_pixel, verbose=verbose); argdata = np.arange(len(indices)); pool = smm.mp.Pool(processes = processes); results = pool.map(func, argdata); pool.close(); pool.join(); smm.free(coordinates_hdl); smm.free(radii_hdl); smm.free(indices_hdl); # smm.free(coordinates_interp_hdl); # smm.free(radii_interp_hdl); # smm.free(indices_interp_hdl); indices_interp = np.array([len(r[1]) for r in results]); indices_interp = np.cumsum(indices_interp); indices_interp = np.hstack([[0], indices_interp]) indices_interp = np.array([indices_interp[:-1], indices_interp[1:]]).T; coordinates_interp = np.vstack([r[0] for r in results]); radii_interp = np.hstack([r[1] for r in results]); return coordinates_interp, radii_interp, indices_interp
@ptb.parallel_traceback def _parallel_interpolate(i, coordinates_hdl, radii_hdl, indices_hdl, # coordinates_interp_hdl, radii_interp_hdl, indices_interp_hdl, smooth = 5, order = 2, points_per_pixel = 0.5, verbose = False): coordinates = smm.get(coordinates_hdl); radii = smm.get(radii_hdl); start,end = smm.get(indices_hdl)[i]; # coordinates_interp = smm.get(coordinates_interp_hdl); # radii_interp = smm.get(radii_interp_hdl); # start_interp,end_interp = smm.get(indices_interp_hdl)[i]; coordinates = coordinates[start:end]; radii = radii[start:end]; if verbose: if i % 1000 == 0: print('Mesh interpolation %d / %d.' % (i, len(smm.get(indices_hdl)))); #coordinates_interp[start:end], radii_interp[start:end]= n_points = _n_points_per_edge(end-start, points_per_pixel=points_per_pixel) #n_points = end_interp - start_interp; #coordinates_interp[start_interp:end_interp], radii_interp[start_interp:end_interp] = return _interpolate_edge(coordinates, radii, n_points=n_points, smooth=smooth, order=order); import ClearMap.Analysis.Curves.Resampling as crs def _n_points_per_edge(n_pixel, points_per_pixel): n_points = int(np.ceil(n_pixel * points_per_pixel)); n_points = max(2, n_points); return n_points; def _interpolate_edge(coordinates, radii, n_points, smooth = 5, order = 2): order = min(coordinates.shape[0]-1, order); coordinates_interp = crs.resample(coordinates, n_points=n_points, smooth=smooth, order=order); radii_interp = crs.resample(radii, n_points=n_points, smooth=smooth, order=order); return coordinates_interp, radii_interp; ############################################################################### ### Tests ############################################################################### def _test(): import numpy as np import ClearMap.Tests.Files as tf import ClearMap.Analysis.Graphs.GraphProcessing as gp #reload(gp) skeleton = tf.source('skeleton'); #import ClearMap.Visualization.Plot3d as p3d #p3d.plot(skeleton) #reload(gp) g = gp.graph_from_skeleton(skeleton) g.has_edge_geometry() g.vertex_coordinates() s = g.skeleton() assert np.all(s==skeleton) gc = gp.clean_graph(g, verbose=True) gr = gp.reduce_graph(gc, verbose=True) coordinates, indices = gr.edge_geometry(return_indices=True, as_list=False); gr.set_edge_geometry(name='radii', values=np.ones(len(coordinates))) radii = gr.edge_geometry('radii', as_list=False); import ClearMap.Analysis.Graphs.GraphVisualization as gv reload(gv) grid, grid_indices = gv.mesh_from_edge_geometry(coordinates=coordinates, radii=radii, indices=indices); import ClearMap.Visualization.GraphVisual as gvi gvi.plot_mesh(grid, grid_indices)