BlockProcessing

Module to process data in parallel for large data sets

This strategy allows memory intensive processing of larger data sets.

Example

>>> import numpy as np
>>> import ClearMap.IO.IO as io
>>> import ClearMap.ParallelProcessing.BlockProcessing as bp
>>> source = io.as_source(np.asarray(np.random.rand(50,100,200), order = 'F'))
>>> blocks = bp.split_into_blocks(source, processes=10, axes=[2], size_min=30, size_max=50, overlap=20);
>>> blocks[0]
Block-Numpy-Source(50, 100, 38)[float64]|F|
>>> blocks[0].info()
'0/10<(0, 0, 0)/(1, 1, 10)> (50, 100, 38)@(50, 100, 200)[(:,:,0:38)]'
>>> b.valid
'Sliced-Block-Numpy-Source(50, 100, 28)[float64]|F|'
>>> b = blocks[0];
>>> print(b.valid.base_shape)
>>> print(b.valid.base_slicing)
>>> print(b.iteration)
(50, 100, 200)
(slice(None, None, None), slice(None, None, None), slice(None, 28, None))
0
>>> shape = (2,3,20);
>>> source = io.npy.Source(array = np.random.rand(*shape));
>>> sink = io.npy.Source(array = np.zeros(shape))
>>>
>>> def process_image(source, sink=None):
>>>    if sink is None:
>>>      sink = np.zeros(source.shape);
>>>    sink[:] = 100 * source[:];
>>>    return sink;
>>>
>>> bp.process(process_image, source, sink,
>>>            processes = 'serial', size_max = 4, size_min = 1, overlap = 0, axes = [2],
>>>            optimization = True, verbose = True);
>>>
>>> print(np.all(sink[:] == process_image(source)))
True
>>> bp.process(process_image, source, sink,
>>>            processes = None, size_max = 10, size_min = 6, overlap = 3, axes = all,
>>>            optimization = True, verbose = True);
block_axes(source, axes=None)[source]

Determine the axes for block processing from source order.

Arguments

sourcearray or Source

The source on which the block processing is used.

axeslist or None

The axes over which to split the block processing.

Returns

axeslist or None

The axes over which to split the block processing.

block_sizes(size, processes=None, size_max=None, size_min=None, overlap=None, optimization=True, optimization_fix='all', verbose=False)[source]

Calculates the block sizes along a single axis when splitting up a source .

Arguments

sizeint

Size of the array dimension to be split up.

processesint

Number of parallel processes to use.

size_maxint or None.

Maximal size of a block. If None, do not split.

size_minint, ‘fixed’, or None

Minimal size of a block. If ‘fixed’ blocks will be of fixed size given by size_max and the overlap is increased if the last block is too small. If None, the minimal size is determined from the overlap.

overlapint or None

Minimal overlap between blocks in a single axis. If None, the overlap defaults to zero.

optimizationbool

If True, optimize block sizes to best fit number of processes.

optimization_fix‘increase’, ‘decrease’, ‘all’ or None

Increase, decrease or optimally change the block size when optimization is active.

verbosebool

Print information on block generation.

Returns

n_blocksint

Number of blocks.

block_rangeslist of tuple of ints

Ranges of the blocks of the form [(lo0,hi0),(lo1,hi1),…].

valid_rangeslist of tuple of ints

Valid ranges of the blocks of the form [(lo0,hi0),(lo1,hi1),…].

Note

The optimization allows block sizes to change slightly to better distribute the blocks over processes, assuming each block processes a similar amount of time.

process(function, source, sink=None, axes=None, size_max=None, size_min=None, overlap=None, optimization=True, optimization_fix='all', neighbours=False, function_type=None, as_memory=False, return_result=False, return_blocks=False, processes=None, verbose=False, **kwargs)[source]

Create blocks and process a function on them in parallel.

Arguments

functionfunction

The main data processing script.

sourcestr, Source, or list

The source or list of sources to apply a function to

sinkstr, Source, list, or None

The sink or list of sinks to write the result to. If None, return single array.

axesint, list of ints, or None

Axes along which to split the source. If None, the splitting is determined automaticlly from the order of the array.

size_maxint, list of ints or None

Maximal size of a block along the axes. If None, default_size_max is used.

size_minint or list of ints

Minial size of a block along the axes. If None, default_size_min is used.

overlapint, list of ints or None

Minimal overlap between blocks along the axes. If None, default_overlap is used.

optimizationbool or list of bools

If True, optimize block sizes to best fit number of processes.

optimization_fix‘increase’, ‘decrease’, ‘all’ or None or list

Increase, decrease or optimally change the block size when optimization is active.

neighboursbool

If True, also include information about the neighbourhood in the blocks.

function_type‘array’, ‘source’, ‘block’ or None

The function type passed. If None, ‘array’ is used.

  • ‘array’ Reading and writing the valid slices from the blocks is automatic and the function gets passed numpy arrays.

  • ‘source’ Reading and writing the valid slices from the blocks is automatic and the function gets passed Source classes as inputs.

  • ‘block’ The function is assumed to act on and update blocks itself.

as_memorybool

If True, load full blocks into memory before applying the function. Can be useful to reduce frequent reading and writing operations of memmaps.

return_resultbool

If True, return the results of the proceessing functions.

return_blocksbool

If True, return the block information used to distribute the processing.

processesint

The number of parallel processes, if ‘serial’, use serial processing.

verbosebool

Print information on sub-stack generation.

Returns

sinkstr, Source, list or array

The results of the processing.

Note

This implementation only supports processing into sinks with the same shape as the source.

process_block_block(sources, sinks, function, as_memory=False, return_result=False, verbose=False, **kwargs)[source]

Process a block with full traceback.

Arguments

sourcessource specifications

Sources passed to the function.

sinkssourcespecifications

Sinks where data is written to.

function funcfunction

The function to call.

process_block_source(sources, sinks, function, as_memory=False, as_array=False, verbose=False, **kwargs)[source]

Process a block with full traceback.

Arguments

sourcessource specifications

Sources passed to the function.

sinkssourcespecifications

Sinks where data is written to.

function funcfunction

The function to call.

split_into_blocks(source, processes=None, axes=None, size_max=None, size_min=None, overlap=None, optimization=True, optimization_fix='all', neighbours=False, verbose=False, **kwargs)[source]

splits a source into a list of Block sources for parallel processing.

The block information is described in ClearMapBlock

Arguments

sourceSource

Source to divide into blocks.

processesint

Number of parallel processes to use.

axesint or list of ints or None

Axes along which to split the source. If None, all axes are split.

size_maxint or list of ints

Maximal size of a block along the axes.

size_minint or list of ints

Minial size of a block along the axes..

overlapint or list of ints

Minimal overlap between blocks along the axes.

optimizationbool or list of bools

If True, optimize block sizes to best fit number of processes.

optimization_fix‘increase’, ‘decrease’, ‘all’ or None or list

Increase, decrease or optimally change the block size when optimization is active.

neighboursbool

If True, also include information about the neighbourhood in the blocks.

verbosebool

Print information on block generation.

Returns

blockslist of Blocks

List of Block classes dividing the source.

default_overlap = None

Default overlap between blocks.

Note

This value is used if overlap passed to ClearMap.ParallelProcessing.BlockProcessing.process() is None.

If this is None, a zero overlap will be used.

default_size_max = None

Default maximal size of a block.

Note

Set this to limit the maximal block sizes automatically if size_max passed to ClearMap.ParallelProcessing.BlockProcessing.process() is None.

If this is None, the full source size will be used.

default_size_min = None

Default minimal size of a block.

Note

Set this to limit the minimal block sizes automatically if size_min passed to ClearMap.ParallelProcessing.BlockProcessing.process() is None.

If this is None, the full source size will be used.