riogrande.parallel#

Block-parallel processing utilities for raster operations.

This module provides worker and aggregation functions that underpin the block-wise parallel processing pipelines in riogrande. Rasters are decomposed into spatial views (see prepare) and each view is processed independently by a pool of worker processes.

Key responsibilities include:

Functions#

combine_views(output_params, job_out_q)

Listens to a queue and writes provided view into a file

compute_mask(source, block_size[, nodata, logic, ...])

Compute the mask of a dataset in parallel and write it out to the file

data_writer(writer, writer_params, aggr_q)

Write out data using the context manager writer

fill_matrix(matrix, aggr_q)

Fill a matrix with data received through a queue.

prepare_selector(*bands, block_size[, ...])

Compute a boolean selector from masks of the provided Band objects

process_block(task, source, bands, view, task_params, ...)

Processes a section of the data in the source file.

process_masks(task, bands, view, task_params, ...[, ...])

Processes a section of the mask for each band

runner_call(queue, callback, params[, wrapper])

Put the results of callback using parameter into the queue

Module Contents#

riogrande.parallel.combine_views(output_params, job_out_q)[source]#

Listens to a queue and writes provided view into a file

Parameters:
riogrande.parallel.compute_mask(source, block_size, nodata=0, logic='all', bands=None, verbose=False, **params)[source]#

Compute the mask of a dataset in parallel and write it out to the file

This function is checking validity against the nodata rule across all bands, provided the selected logic (via reduced_mask()). The dataset is split into blocks with create_views() and each block is processed by process_block() in a multiprocessing.pool.Pool obtained via get_or_set_context().

Parameters:
  • source (str or Source) – Path to the tif file or a Source object.

  • block_size (tuple[int, int]) – Size (width, height) in #pixel of the block that a single job processes

  • nodata (int or float) – Supply nodata value to use for mask computation

  • logic (str) –

    Either a string or a callable. Allowed strings are:

    • "any" : Mask each cell where any of the bands matches the nodata value.

    • "all" : Mask each cell where all of the bands match the nodata value.

  • bands (list[Band] or None) – An optional selection of Band objects to use. If not provided all bands are used.

  • verbose (bool) – Print out processing step infos

  • **params (dict) –

    Optional arguments for the multiprocessing:

Return type:

None

See also

prepare_selector()

Build a boolean selector from band masks.

reduced_mask()

Per-block mask computation function used internally.

riogrande.parallel.data_writer(writer, writer_params, aggr_q)[source]#

Write out data using the context manager writer

This function can be used with the various context managers defined in the Source and Band classes, such as mask_writer() or data_writer().

Parameters:
  • writer (Callable) – A data_writer() or mask_writer() context manager from a Source or Band.

  • writer_params (dict) – Keyword arguments that will be passed to the writer method

  • aggr_q (Queue) – The queue this job listens to.

Returns:

Can report the duration of the task.

Return type:

TimedTask

See also

combine_views()

Listener that writes views into a file.

process_block()

Worker that processes and enqueues a data block.

riogrande.parallel.fill_matrix(matrix, aggr_q)[source]#

Fill a matrix with data received through a queue.

Each received block is placed into matrix via update_view().

Parameters:
  • matrix (NDArray) – The numpy.ndarray to fill with data.

  • aggr_q (Queue) –

    The multiprocessing.Queue this job listens to. Each element in the queue must be a dict containing either:

    • {"view": ..., "data": ...} specifying where to write what.

    • {"signal": "kill"} to terminate the process and return the filled matrix.

Returns:

The first object is the filled numpy.ndarray, the second holds a TimedTask object with duration information.

Return type:

matrix, (timer,)

See also

update_view()

Write a block into a view of an array.

prepare_selector()

Uses this function as the aggregator job.

riogrande.parallel.prepare_selector(*bands, block_size, extra_masking_band=None, verbose=False, **params)[source]#

Compute a boolean selector from masks of the provided Band objects

Band masks are aggregated into a boolean selector (via aggregated_selector()) that can be used to identify valid pixels across all provided bands. Optionally, an extra masking band may be applied where values equal to 0 are masked out. The dataset is split into blocks with create_views() and each block is processed by process_masks() in a multiprocessing.pool.Pool obtained via get_or_set_context(). Results are assembled by fill_matrix().

Parameters:
  • bands (Band) – A collection of Band objects specifying which bands to use.

  • block_size (tuple[int, int]) – Size (width, height) in #pixel of the block that a single job processes

  • extra_masking_band (Band or None) – Optional Band object that is treated as a rasterio mask, i.e. values equal to 0 will be masked.

  • verbose (bool) – Print out processing step infos

  • **params (dict) –

    Optional arguments for the multiprocessing:

Returns:

A boolean numpy.ndarray that can be used as selector.

Return type:

NDArray

See also

compute_mask()

Analogous function that writes the mask to file.

aggregated_selector()

Per-block aggregation function used internally.

riogrande.parallel.process_block(task, source, bands, view, task_params, read_params, open_params, out_q)[source]#

Processes a section of the data in the source file.

This is a general purpose function that can be used to process a large .tif in a parallelized manner.

The view is converted to a rasterio.windows.Window via view_to_window(), and the result is enqueued using runner_call().

Parameters:
  • task (Callable) – Function that will be called on the data from the specified band. The first argument of the function must be data, a numpy.ndarray that holds the data from this section.

  • source (str or Source) – Either a string or a Source object.

  • bands (Collection[Band] or None) – A collection of Band objects specifying which bands to use.

  • view (tuple[int, int, int, int]) – A tuple (x, y, width, height) defining the view of data to extract and process.

  • task_params (dict) – Keyword arguments that will be passed to the callable task

  • read_params (dict) – Keyword arguments that are passed to the open method of the source object

  • open_params (dict) – Keyword arguments that are passed to the reader method of the source object

  • out_q (Queue) – The queue this job will put the output of the callable task into

Returns:

Can report the duration of the task.

Return type:

TimedTask

See also

process_masks()

Analogous function operating on band masks.

runner_call()

Helper that enqueues the callback result.

riogrande.parallel.process_masks(task, bands, view, task_params, read_params, open_params, aggr_q, extra_masking_band=None)[source]#

Processes a section of the mask for each band

This is a general purpose function that can be used to process a large .tif in a parallelized manner.

The view is converted to a rasterio.windows.Window via view_to_window(), and the result is enqueued using runner_call().

Parameters:
  • task (Callable) – Function that will be called on the data from the specified band. The first argument of the function must be masks, a list of numpy.ndarray holding the masks from this section.

  • bands (Collection[Band]) – A collection of Band objects specifying which bands to use. Their mask readers are obtained via get_mask_reader().

  • view (tuple[int, int, int, int]) – A tuple (x, y, width, height) defining the view of data to extract and process.

  • task_params (dict) – Keyword arguments that will be passed to the callable task

  • read_params (dict) – Keyword arguments that are passed to the open method of the source object

  • open_params (dict) – Keyword arguments that are passed to the reader method of the source object

  • aggr_q (Queue) – The queue this job will put the output of the callable task into

  • extra_masking_band (Band or None) –

    Optional Band object that is treated as a rasterio mask, i.e. values equal to 0 are masked.

    Warning

    This Band is treated as a mask itself, its own mask is ignored.

Returns:

Can report the duration of the task.

Return type:

TimedTask

See also

process_block()

Analogous function operating on band data.

runner_call()

Helper that enqueues the callback result.

riogrande.parallel.runner_call(queue, callback, params, wrapper=None)[source]#

Put the results of callback using parameter into the queue

The function calls callback(**params), and optionally passes the result through a wrapper function if provided.

Parameters:
  • queue (Queue[Any]) – A multiprocessing.Queue into which the result (wrapped or unwrapped) will be placed.

  • callback (Callable) – A callable object (function or method) to be executed.

  • params (dict) – A dictionary of keyword arguments passed to the callback.

  • wrapper (Callable or None) – A function applied to the callback result before it is placed into the queue. If None, the raw result is placed into the queue.

Returns:

The unwrapped output.

Return type:

dict

See also

process_block()

Uses this function to enqueue block results.

process_masks()

Uses this function to enqueue mask results.