riogrande.parallel#
Block-parallel processing utilities for raster operations.
This module provides worker and aggregation functions that underpin the
block-wise parallel processing pipelines in riogrande. Rasters are
decomposed into spatial views (see prepare) and each view
is processed independently by a pool of worker processes.
Key responsibilities include:
Block processing: Reading a spatial block from a raster source and applying a task function (
process_block(),process_masks()).Result aggregation: Writing processed blocks back into an output raster (
combine_views(),fill_matrix(),data_writer()).Mask computation: Computing a combined boolean selector mask across multiple bands in parallel (
compute_mask(),prepare_selector()).Queue-based runners: A generic callback wrapper for multiprocessing queues (
runner_call()).
Functions#
|
Listens to a queue and writes provided view into a file |
|
Compute the mask of a dataset in parallel and write it out to the file |
|
Write out data using the context manager writer |
|
Fill a matrix with data received through a queue. |
|
Compute a boolean selector from masks of the provided |
|
Processes a section of the data in the source file. |
|
Processes a section of the mask for each band |
|
Put the results of callback using parameter into the queue |
Module Contents#
- riogrande.parallel.combine_views(output_params, job_out_q)[source]#
Listens to a queue and writes provided view into a file
- Parameters:
output_params (dict)
job_out_q (multiprocessing.Queue)
- riogrande.parallel.compute_mask(source, block_size, nodata=0, logic='all', bands=None, verbose=False, **params)[source]#
Compute the mask of a dataset in parallel and write it out to the file
This function is checking validity against the nodata rule across all bands, provided the selected logic (via
reduced_mask()). The dataset is split into blocks withcreate_views()and each block is processed byprocess_block()in amultiprocessing.pool.Poolobtained viaget_or_set_context().- Parameters:
source (str or
Source) – Path to the tif file or a Source object.block_size (tuple[int, int]) – Size (width, height) in #pixel of the block that a single job processes
nodata (int or float) – Supply nodata value to use for mask computation
logic (str) –
Either a string or a callable. Allowed strings are:
"any": Mask each cell where any of the bands matches the nodata value."all": Mask each cell where all of the bands match the nodata value.
bands (list[Band] or None) – An optional selection of
Bandobjects to use. If not provided all bands are used.verbose (bool) – Print out processing step infos
**params (dict) –
Optional arguments for the multiprocessing:
nbrcpu: int Number of CPUs to use, passed toget_nbr_workers().start_method: str Starting method for multiprocessing jobs, passed toget_or_set_context().
- Return type:
None
See also
prepare_selector()Build a boolean selector from band masks.
reduced_mask()Per-block mask computation function used internally.
- riogrande.parallel.data_writer(writer, writer_params, aggr_q)[source]#
Write out data using the context manager writer
This function can be used with the various context managers defined in the
SourceandBandclasses, such asmask_writer()ordata_writer().- Parameters:
writer (Callable) – A
data_writer()ormask_writer()context manager from aSourceorBand.writer_params (dict) – Keyword arguments that will be passed to the writer method
aggr_q (Queue) – The queue this job listens to.
- Returns:
Can report the duration of the task.
- Return type:
See also
combine_views()Listener that writes views into a file.
process_block()Worker that processes and enqueues a data block.
- riogrande.parallel.fill_matrix(matrix, aggr_q)[source]#
Fill a matrix with data received through a queue.
Each received block is placed into matrix via
update_view().- Parameters:
matrix (NDArray) – The
numpy.ndarrayto fill with data.aggr_q (Queue) –
The
multiprocessing.Queuethis job listens to. Each element in the queue must be adictcontaining either:{"view": ..., "data": ...}specifying where to write what.{"signal": "kill"}to terminate the process and return the filled matrix.
- Returns:
The first object is the filled
numpy.ndarray, the second holds aTimedTaskobject with duration information.- Return type:
matrix, (timer,)
See also
update_view()Write a block into a view of an array.
prepare_selector()Uses this function as the aggregator job.
- riogrande.parallel.prepare_selector(*bands, block_size, extra_masking_band=None, verbose=False, **params)[source]#
Compute a boolean selector from masks of the provided
BandobjectsBand masks are aggregated into a boolean selector (via
aggregated_selector()) that can be used to identify valid pixels across all provided bands. Optionally, an extra masking band may be applied where values equal to 0 are masked out. The dataset is split into blocks withcreate_views()and each block is processed byprocess_masks()in amultiprocessing.pool.Poolobtained viaget_or_set_context(). Results are assembled byfill_matrix().- Parameters:
bands (Band) – A collection of
Bandobjects specifying which bands to use.block_size (tuple[int, int]) – Size (width, height) in #pixel of the block that a single job processes
extra_masking_band (Band or None) – Optional
Bandobject that is treated as a rasterio mask, i.e. values equal to 0 will be masked.verbose (bool) – Print out processing step infos
**params (dict) –
Optional arguments for the multiprocessing:
nbrcpu: int The number of CPUs to use, passed toget_nbr_workers().start_method: str Starting method for multiprocessing jobs, passed toget_or_set_context().
- Returns:
A boolean
numpy.ndarraythat can be used as selector.- Return type:
NDArray
See also
compute_mask()Analogous function that writes the mask to file.
aggregated_selector()Per-block aggregation function used internally.
- riogrande.parallel.process_block(task, source, bands, view, task_params, read_params, open_params, out_q)[source]#
Processes a section of the data in the source file.
This is a general purpose function that can be used to process a large .tif in a parallelized manner.
The view is converted to a
rasterio.windows.Windowviaview_to_window(), and the result is enqueued usingrunner_call().- Parameters:
task (Callable) – Function that will be called on the data from the specified band. The first argument of the function must be data, a
numpy.ndarraythat holds the data from this section.source (str or Source) – Either a string or a
Sourceobject.bands (Collection[Band] or None) – A collection of
Bandobjects specifying which bands to use.view (tuple[int, int, int, int]) – A tuple (x, y, width, height) defining the view of data to extract and process.
task_params (dict) – Keyword arguments that will be passed to the callable task
read_params (dict) – Keyword arguments that are passed to the open method of the source object
open_params (dict) – Keyword arguments that are passed to the reader method of the source object
out_q (Queue) – The queue this job will put the output of the callable task into
- Returns:
Can report the duration of the task.
- Return type:
See also
process_masks()Analogous function operating on band masks.
runner_call()Helper that enqueues the callback result.
- riogrande.parallel.process_masks(task, bands, view, task_params, read_params, open_params, aggr_q, extra_masking_band=None)[source]#
Processes a section of the mask for each band
This is a general purpose function that can be used to process a large .tif in a parallelized manner.
The view is converted to a
rasterio.windows.Windowviaview_to_window(), and the result is enqueued usingrunner_call().- Parameters:
task (Callable) – Function that will be called on the data from the specified band. The first argument of the function must be masks, a list of
numpy.ndarrayholding the masks from this section.bands (Collection[Band]) – A collection of
Bandobjects specifying which bands to use. Their mask readers are obtained viaget_mask_reader().view (tuple[int, int, int, int]) – A tuple (x, y, width, height) defining the view of data to extract and process.
task_params (dict) – Keyword arguments that will be passed to the callable task
read_params (dict) – Keyword arguments that are passed to the open method of the source object
open_params (dict) – Keyword arguments that are passed to the reader method of the source object
aggr_q (Queue) – The queue this job will put the output of the callable task into
extra_masking_band (Band or None) –
Optional
Bandobject that is treated as a rasterio mask, i.e. values equal to 0 are masked.Warning
This Band is treated as a mask itself, its own mask is ignored.
- Returns:
Can report the duration of the task.
- Return type:
See also
process_block()Analogous function operating on band data.
runner_call()Helper that enqueues the callback result.
- riogrande.parallel.runner_call(queue, callback, params, wrapper=None)[source]#
Put the results of callback using parameter into the queue
The function calls
callback(**params), and optionally passes the result through a wrapper function if provided.- Parameters:
queue (Queue[Any]) – A
multiprocessing.Queueinto which the result (wrapped or unwrapped) will be placed.callback (Callable) – A callable object (function or method) to be executed.
params (dict) – A dictionary of keyword arguments passed to the callback.
wrapper (Callable or None) – A function applied to the callback result before it is placed into the queue. If
None, the raw result is placed into the queue.
- Returns:
The unwrapped output.
- Return type:
See also
process_block()Uses this function to enqueue block results.
process_masks()Uses this function to enqueue mask results.