"""
Block-parallel processing utilities for raster operations.
This module provides worker and aggregation functions that underpin the
block-wise parallel processing pipelines in ``riogrande``. Rasters are
decomposed into spatial views (see :mod:`~riogrande.prepare`) and each view
is processed independently by a pool of worker processes.
Key responsibilities include:
- **Block processing**: Reading a spatial block from a raster source and
applying a task function (:func:`process_block`, :func:`process_masks`).
- **Result aggregation**: Writing processed blocks back into an output raster
(:func:`combine_views`, :func:`fill_matrix`, :func:`data_writer`).
- **Mask computation**: Computing a combined boolean selector mask across
multiple bands in parallel (:func:`compute_mask`, :func:`prepare_selector`).
- **Queue-based runners**: A generic callback wrapper for multiprocessing queues
(:func:`runner_call`).
"""
from __future__ import annotations
import warnings
from typing import Any
from copy import copy
from collections.abc import Callable, Collection
from multiprocessing import (Queue, Manager)
import numpy as np
from numpy.typing import NDArray
from .io import Source, Band
from .helper import (view_to_window,
reduced_mask,
aggregated_selector,
get_or_set_context,
get_nbr_workers, )
from .prepare import create_views, update_view
from .timing import TimedTask
[docs]
def combine_views(output_params: dict, job_out_q: Queue):
"""Listens to a queue and writes provided view into a file
"""
with TimedTask() as timer:
output_file = output_params.pop('output_file')
profile = output_params.pop('profile')
out_band = output_params.pop('band')
out_tag = output_params.pop('tags')
verbose = output_params.get('verbose', False)
if out_band is None:
out_band = Band(source=Source(path=output_file),
bidx=1,
tags=out_tag) # create the file
out_band.init_source(profile=profile)
out_band.export_tags()
with out_band.data_writer(mode='r+') as write: # write until done, i.e. receiving no more jobs - kill
while True:
output = job_out_q.get()
signal = output.get('signal', None)
if signal:
if signal == "kill":
if verbose:
print(f"\n\nClosing: {out_band.source.path}\n\n")
break
data = output.pop('data')
view = copy(output.pop('view'))
w = view_to_window(view)
write(data, window=w)
if verbose:
print(f"Wrote out block {view=}")
timer.new_lab()
return timer
[docs]
def data_writer(writer: Callable, writer_params: dict, aggr_q: Queue) -> TimedTask:
"""Write out data using the context manager `writer`
This function can be used with the various context managers defined
in the :class:`~riogrande.io.models.Source` and :class:`~riogrande.io.models.Band` classes,
such as :meth:`~riogrande.io.models.Source.mask_writer` or
:meth:`~riogrande.io.models.Band.data_writer`.
Parameters
----------
writer : Callable
A :meth:`~riogrande.io.models.Source.data_writer` or
:meth:`~riogrande.io.models.Source.mask_writer` context manager
from a :class:`~riogrande.io.models.Source` or :class:`~riogrande.io.models.Band`.
writer_params : dict
Keyword arguments that will be passed to the `writer` method
aggr_q : Queue
The queue this job listens to.
Returns
-------
:class:`~riogrande.timing.TimedTask`
Can report the duration of the task.
See Also
--------
:func:`~riogrande.parallel.combine_views` : Listener that writes views into a file.
:func:`~riogrande.parallel.process_block` : Worker that processes and enqueues a data block.
"""
with TimedTask() as timer:
with writer(**writer_params) as write:
while True:
job_out = aggr_q.get()
signal = job_out.get('signal', None)
if signal:
if signal == "kill":
break
data = job_out.pop('data')
view = copy(job_out.pop('view'))
w = view_to_window(view)
write(data, window=w)
timer.new_lab()
return timer
[docs]
def process_block(task: Callable, source: str | Source, bands: Collection[Band] | None, view: tuple[int, int, int, int],
task_params: dict, read_params: dict, open_params: dict, out_q: Queue) -> TimedTask:
"""Processes a section of the data in the source file.
This is a general purpose function that can be used to process a large .tif
in a parallelized manner.
The view is converted to a :class:`rasterio.windows.Window` via
:func:`~riogrande.helper.view_to_window`, and the result is enqueued
using :func:`~riogrande.parallel.runner_call`.
Parameters
----------
task : Callable
Function that will be called on the data from the specified band.
The first argument of the function must be `data`, a :class:`numpy.ndarray`
that holds the data from this section.
source : str or Source
Either a string or a :class:`~riogrande.io.models.Source` object.
bands : Collection[Band] or None
A collection of :class:`~riogrande.io.models.Band` objects specifying
which bands to use.
view : tuple[int, int, int, int]
A tuple (x, y, width, height) defining the view of data to extract and
process.
task_params : dict
Keyword arguments that will be passed to the callable `task`
read_params : dict
Keyword arguments that are passed to the open method of the `source` object
open_params : dict
Keyword arguments that are passed to the reader method of the `source` object
out_q : Queue
The queue this job will put the output of the callable `task` into
Returns
-------
:class:`~riogrande.timing.TimedTask`
Can report the duration of the task.
See Also
--------
:func:`~riogrande.parallel.process_masks` : Analogous function operating on band masks.
:func:`~riogrande.parallel.runner_call` : Helper that enqueues the callback result.
"""
with TimedTask() as timer:
if not isinstance(source, Source):
source = Source(path=source)
if bands is None:
bands = source.get_bands()
warnings.warn(
"No specific bands selected, using all"
)
assert all(band.source == source for band in bands), "Not all bands point to the correct source!"
window = view_to_window(view)
with source.data_reader(bands=bands, **open_params) as read:
data = read(window=window, **read_params)
_ = runner_call(callback=task,
params=dict(array=data, **task_params),
queue=out_q,
wrapper=lambda x: dict(data=x, view=view))
# print(f"{view=}\n{data=}\nmask={_}")
return timer
[docs]
def process_masks(task: Callable, bands: Collection[Band], view: tuple[int, int, int, int],
task_params: dict, read_params: dict, open_params: dict, aggr_q: Queue,
extra_masking_band: Band | None = None) -> TimedTask:
"""Processes a section of the mask for each band
This is a general purpose function that can be used to process a large .tif
in a parallelized manner.
The view is converted to a :class:`rasterio.windows.Window` via
:func:`~riogrande.helper.view_to_window`, and the result is enqueued
using :func:`~riogrande.parallel.runner_call`.
Parameters
----------
task : Callable
Function that will be called on the data from the specified band.
The first argument of the function must be `masks`, a list of
:class:`numpy.ndarray` holding the masks from this section.
bands : Collection[Band]
A collection of :class:`~riogrande.io.models.Band` objects specifying
which bands to use. Their mask readers are obtained via
:meth:`~riogrande.io.models.Band.get_mask_reader`.
view : tuple[int, int, int, int]
A tuple (x, y, width, height) defining the view of data to extract and
process.
task_params : dict
Keyword arguments that will be passed to the callable `task`
read_params : dict
Keyword arguments that are passed to the open method of the `source` object
open_params : dict
Keyword arguments that are passed to the reader method of the `source` object
aggr_q : Queue
The queue this job will put the output of the callable `task` into
extra_masking_band : Band or None
Optional :class:`~riogrande.io.models.Band` object that is treated as
a rasterio mask, i.e. values equal to 0 are masked.
.. warning::
This Band is treated as a mask itself, its own mask is ignored.
Returns
-------
:class:`~riogrande.timing.TimedTask`
Can report the duration of the task.
See Also
--------
:func:`~riogrande.parallel.process_block` : Analogous function operating on band data.
:func:`~riogrande.parallel.runner_call` : Helper that enqueues the callback result.
"""
window = view_to_window(view)
with TimedTask() as timer:
masks = []
for band in bands:
mask_reader = band.get_mask_reader()
with mask_reader(**open_params) as read_mask:
_mask = read_mask(window=window, **read_params)
masks.append(_mask)
# add the data from the extra masking band as an additional mask
if extra_masking_band is not None:
with extra_masking_band.data_reader() as read:
extra_mask = read(window=window)
masks.append(extra_mask)
_ = runner_call(callback=task,
params=dict(masks=masks, **task_params),
queue=aggr_q,
wrapper=lambda x: dict(data=x, view=view))
# print(f"{view=}\n{data=}\nmask={_}")
return timer
[docs]
def runner_call(queue: Queue[Any], callback: Callable, params: dict, wrapper: Callable | None = None) -> dict:
"""Put the results of callback using parameter into the queue
The function calls ``callback(**params)``, and optionally passes the result
through a wrapper function if provided.
Parameters
----------
queue : Queue[Any]
A :class:`multiprocessing.Queue` into which the result (wrapped or
unwrapped) will be placed.
callback : Callable
A callable object (function or method) to be executed.
params : dict
A dictionary of keyword arguments passed to the callback.
wrapper : Callable or None
A function applied to the callback result before it is placed into the
queue. If ``None``, the raw result is placed into the queue.
Returns
-------
dict
The unwrapped output.
See Also
--------
:func:`~riogrande.parallel.process_block` : Uses this function to enqueue block results.
:func:`~riogrande.parallel.process_masks` : Uses this function to enqueue mask results.
"""
output = callback(**params)
if wrapper is not None:
queue.put(wrapper(output))
else:
queue.put(output)
return output
[docs]
def compute_mask(source: str | Source, block_size: tuple[int, int], nodata=0, logic: str = 'all',
bands: list[Band] | None = None, verbose: bool = False, **params) -> None:
"""Compute the mask of a dataset in parallel and write it out to the file
This function is checking validity against the nodata rule across all bands, provided the
selected logic (via :func:`~riogrande.helper.reduced_mask`).
The dataset is split into blocks with :func:`~riogrande.prepare.create_views`
and each block is processed by :func:`~riogrande.parallel.process_block` in a
:class:`multiprocessing.pool.Pool` obtained via :func:`~riogrande.helper.get_or_set_context`.
Parameters
----------
source : str or :class:`~riogrande.io.models.Source`
Path to the tif file or a Source object.
block_size : tuple[int, int]
Size (width, height) in #pixel of the block that a single job processes
nodata : int or float
Supply nodata value to use for mask computation
logic : str
Either a string or a callable.
Allowed strings are:
- ``"any"`` : Mask each cell where *any* of the bands matches the nodata value.
- ``"all"`` : Mask each cell where *all* of the bands match the nodata value.
bands : list[Band] or None
An optional selection of :class:`~riogrande.io.models.Band` objects to use.
If not provided all bands are used.
verbose : bool
Print out processing step infos
**params : dict
Optional arguments for the multiprocessing:
- ``nbrcpu`` : int
Number of CPUs to use, passed to :func:`~riogrande.helper.get_nbr_workers`.
- ``start_method`` : str
Starting method for multiprocessing jobs, passed to
:func:`~riogrande.helper.get_or_set_context`.
Returns
-------
None
See Also
--------
:func:`~riogrande.parallel.prepare_selector` : Build a boolean selector from band masks.
:func:`~riogrande.helper.reduced_mask` : Per-block mask computation function used internally.
"""
print(f'compute_mask - {source=}')
if isinstance(source, str):
source = Source(path=source)
source.import_profile()
width = source.profile.get('width')
height = source.profile.get('height')
if bands is None:
bands = source.get_bands()
else:
for band in bands:
band.source.import_profile()
# set the per-block parameter
_, inner_views = create_views(view_size=block_size,
border=(0, 0),
size=(width, height)
)
block_params = []
for view in inner_views:
task_params = dict(
nodata=nodata,
logic=logic,
)
read_params = dict()
open_params = dict(mode='r', )
bparams = dict(task=reduced_mask,
source=source,
bands=bands,
view=view,
task_params=task_params,
open_params=open_params,
read_params=read_params,
)
block_params.append(bparams)
# prepare multiprocessing
manager = Manager()
aggr_q = manager.Queue()
start_method = params.get('start_method', None)
nbr_workers = get_nbr_workers(number=params.pop('nbrcpu', None))
if verbose:
print(f"using {nbr_workers=}")
with get_or_set_context(start_method).Pool(nbr_workers) as pool:
# start the aggregator task
aggr_params = dict(mode='r+')
aggregator_job = pool.apply_async(
func=data_writer,
kwds=dict(
writer=source.mask_writer,
writer_params=aggr_params,
aggr_q=aggr_q,
),
)
# start the block jobs
block_jobs = []
for bparams in block_params:
block_jobs.append(pool.apply_async(
func=process_block,
kwds=dict(**bparams, out_q=aggr_q, )
))
# collect results
job_timers = []
for job in block_jobs:
job_timers.append(job.get().get_duration())
aggr_q.put(dict(signal='kill'))
pool.close()
pool.join()
total_duration = aggregator_job.get().get_duration()
[docs]
def fill_matrix(matrix: NDArray, aggr_q: Queue) -> tuple[NDArray | None, tuple]:
"""Fill a matrix with data received through a queue.
Each received block is placed into `matrix` via
:func:`~riogrande.prepare.update_view`.
Parameters
----------
matrix : NDArray
The :class:`numpy.ndarray` to fill with data.
aggr_q : Queue
The :class:`multiprocessing.Queue` this job listens to. Each element
in the queue must be a ``dict`` containing either:
- ``{"view": ..., "data": ...}`` specifying where to write what.
- ``{"signal": "kill"}`` to terminate the process and return the filled matrix.
Returns
-------
matrix, (timer,):
The first object is the filled :class:`numpy.ndarray`, the second holds a
:class:`~riogrande.timing.TimedTask` object with duration information.
See Also
--------
:func:`~riogrande.prepare.update_view` : Write a block into a view of an array.
:func:`~riogrande.parallel.prepare_selector` : Uses this function as the aggregator job.
"""
with TimedTask() as timer:
while True:
output = aggr_q.get()
signal = output.get('signal', None)
if signal:
if signal == "kill":
break
view = output.pop('view')
block_data = output.pop('data')
update_view(data=matrix, view=view, block=block_data)
timer.new_lab()
return matrix, (timer,)
[docs]
def prepare_selector(*bands: Band, block_size: tuple[int, int], extra_masking_band: Band | None = None,
verbose=False, **params) -> NDArray:
"""Compute a boolean selector from masks of the provided :class:`~riogrande.io.models.Band` objects
Band masks are aggregated into a boolean selector (via
:func:`~riogrande.helper.aggregated_selector`) that can be used to identify valid pixels
across all provided bands. Optionally, an extra masking band may be applied where values
equal to 0 are masked out.
The dataset is split into blocks with :func:`~riogrande.prepare.create_views`
and each block is processed by :func:`~riogrande.parallel.process_masks` in a
:class:`multiprocessing.pool.Pool` obtained via :func:`~riogrande.helper.get_or_set_context`.
Results are assembled by :func:`~riogrande.parallel.fill_matrix`.
Parameters
----------
bands : Band
A collection of :class:`~riogrande.io.models.Band` objects specifying
which bands to use.
block_size : tuple[int, int]
Size (width, height) in #pixel of the block that a single job processes
extra_masking_band : Band or None
Optional :class:`~riogrande.io.models.Band` object that is treated as a rasterio
mask, i.e. values equal to 0 will be masked.
verbose : bool
Print out processing step infos
**params : dict
Optional arguments for the multiprocessing:
- ``nbrcpu`` : int
The number of CPUs to use, passed to :func:`~riogrande.helper.get_nbr_workers`.
- ``start_method`` : str
Starting method for multiprocessing jobs, passed to
:func:`~riogrande.helper.get_or_set_context`.
Returns
-------
NDArray
A boolean :class:`numpy.ndarray` that can be used as selector.
See Also
--------
:func:`~riogrande.parallel.compute_mask` : Analogous function that writes the mask to file.
:func:`~riogrande.helper.aggregated_selector` : Per-block aggregation function used internally.
"""
print(f'prepare_selector - {bands=}')
# make sure the bands are compatible
_source0 = bands[0].source
if len(bands) > 1:
_source0.check_compatibility(*(b.source for b in bands[1:]))
# make sure the profile is up to date
source0_profile = bands[0].source.import_profile()
width = int(source0_profile['width'])
height = int(source0_profile['height'])
# set the per-block parameter
_, inner_views = create_views(view_size=block_size,
border=(0, 0),
size=(width, height)
)
# set the per job parameter
block_params = []
for view in inner_views:
bparams = dict(
task=aggregated_selector,
bands=bands,
view=view,
task_params=dict(logic='all'),
open_params=dict(mode='r'),
read_params=dict(),
extra_masking_band=extra_masking_band,
)
block_params.append(bparams)
# prepare multiprocessing
manager = Manager()
aggr_q = manager.Queue()
start_method = params.get('start_method', None)
nbr_workers = get_nbr_workers(number=params.pop('nbrcpu', None))
if verbose:
print(f"using {nbr_workers=}")
with get_or_set_context(start_method).Pool(nbr_workers) as pool:
# start the aggregator job
aggr_params = dict(
matrix=np.full((height, width), False),
aggr_q=aggr_q
) # set the aggregator parameter - just an all-False selector
aggregator_job = pool.apply_async(
func=fill_matrix,
kwds=aggr_params,
)
# start the block jobs
block_jobs = []
for bparams in block_params:
block_jobs.append(pool.apply_async(
func=process_masks,
kwds=dict(**bparams, aggr_q=aggr_q, )
))
# collect results
job_timers = []
for job in block_jobs:
job_timers.append(job.get().get_duration())
aggr_q.put(dict(signal='kill'))
pool.close()
pool.join()
selector, (timer,) = aggregator_job.get()
if selector is None:
print("WARNING: The selector creation retunred no selector: "
"All pixel are used!")
selector = np.full(shape=(height, width), fill_value=True)
total_duration = timer.get_duration()
return selector