riogrande.parallel ================== .. py:module:: riogrande.parallel .. autoapi-nested-parse:: Block-parallel processing utilities for raster operations. This module provides worker and aggregation functions that underpin the block-wise parallel processing pipelines in ``riogrande``. Rasters are decomposed into spatial views (see :mod:`~riogrande.prepare`) and each view is processed independently by a pool of worker processes. Key responsibilities include: - **Block processing**: Reading a spatial block from a raster source and applying a task function (:func:`process_block`, :func:`process_masks`). - **Result aggregation**: Writing processed blocks back into an output raster (:func:`combine_views`, :func:`fill_matrix`, :func:`data_writer`). - **Mask computation**: Computing a combined boolean selector mask across multiple bands in parallel (:func:`compute_mask`, :func:`prepare_selector`). - **Queue-based runners**: A generic callback wrapper for multiprocessing queues (:func:`runner_call`). Functions --------- .. autoapisummary:: riogrande.parallel.combine_views riogrande.parallel.compute_mask riogrande.parallel.data_writer riogrande.parallel.fill_matrix riogrande.parallel.prepare_selector riogrande.parallel.process_block riogrande.parallel.process_masks riogrande.parallel.runner_call Module Contents --------------- .. py:function:: combine_views(output_params, job_out_q) Listens to a queue and writes provided view into a file .. py:function:: compute_mask(source, block_size, nodata=0, logic = 'all', bands = None, verbose = False, **params) Compute the mask of a dataset in parallel and write it out to the file This function is checking validity against the nodata rule across all bands, provided the selected logic (via :func:`~riogrande.helper.reduced_mask`). The dataset is split into blocks with :func:`~riogrande.prepare.create_views` and each block is processed by :func:`~riogrande.parallel.process_block` in a :class:`multiprocessing.pool.Pool` obtained via :func:`~riogrande.helper.get_or_set_context`. :param source: Path to the tif file or a Source object. :type source: str or :class:`~riogrande.io.models.Source` :param block_size: Size (width, height) in #pixel of the block that a single job processes :type block_size: tuple[int, int] :param nodata: Supply nodata value to use for mask computation :type nodata: int or float :param logic: Either a string or a callable. Allowed strings are: - ``"any"`` : Mask each cell where *any* of the bands matches the nodata value. - ``"all"`` : Mask each cell where *all* of the bands match the nodata value. :type logic: str :param bands: An optional selection of :class:`~riogrande.io.models.Band` objects to use. If not provided all bands are used. :type bands: list[Band] or None :param verbose: Print out processing step infos :type verbose: bool :param \*\*params: Optional arguments for the multiprocessing: - ``nbrcpu`` : int Number of CPUs to use, passed to :func:`~riogrande.helper.get_nbr_workers`. - ``start_method`` : str Starting method for multiprocessing jobs, passed to :func:`~riogrande.helper.get_or_set_context`. :type \*\*params: dict :rtype: None .. seealso:: :func:`~riogrande.parallel.prepare_selector` Build a boolean selector from band masks. :func:`~riogrande.helper.reduced_mask` Per-block mask computation function used internally. .. py:function:: data_writer(writer, writer_params, aggr_q) Write out data using the context manager `writer` This function can be used with the various context managers defined in the :class:`~riogrande.io.models.Source` and :class:`~riogrande.io.models.Band` classes, such as :meth:`~riogrande.io.models.Source.mask_writer` or :meth:`~riogrande.io.models.Band.data_writer`. :param writer: A :meth:`~riogrande.io.models.Source.data_writer` or :meth:`~riogrande.io.models.Source.mask_writer` context manager from a :class:`~riogrande.io.models.Source` or :class:`~riogrande.io.models.Band`. :type writer: Callable :param writer_params: Keyword arguments that will be passed to the `writer` method :type writer_params: dict :param aggr_q: The queue this job listens to. :type aggr_q: Queue :returns: Can report the duration of the task. :rtype: :class:`~riogrande.timing.TimedTask` .. seealso:: :func:`~riogrande.parallel.combine_views` Listener that writes views into a file. :func:`~riogrande.parallel.process_block` Worker that processes and enqueues a data block. .. py:function:: fill_matrix(matrix, aggr_q) Fill a matrix with data received through a queue. Each received block is placed into `matrix` via :func:`~riogrande.prepare.update_view`. :param matrix: The :class:`numpy.ndarray` to fill with data. :type matrix: NDArray :param aggr_q: The :class:`multiprocessing.Queue` this job listens to. Each element in the queue must be a ``dict`` containing either: - ``{"view": ..., "data": ...}`` specifying where to write what. - ``{"signal": "kill"}`` to terminate the process and return the filled matrix. :type aggr_q: Queue :returns: The first object is the filled :class:`numpy.ndarray`, the second holds a :class:`~riogrande.timing.TimedTask` object with duration information. :rtype: matrix, (timer,) .. seealso:: :func:`~riogrande.prepare.update_view` Write a block into a view of an array. :func:`~riogrande.parallel.prepare_selector` Uses this function as the aggregator job. .. py:function:: prepare_selector(*bands, block_size, extra_masking_band = None, verbose=False, **params) Compute a boolean selector from masks of the provided :class:`~riogrande.io.models.Band` objects Band masks are aggregated into a boolean selector (via :func:`~riogrande.helper.aggregated_selector`) that can be used to identify valid pixels across all provided bands. Optionally, an extra masking band may be applied where values equal to 0 are masked out. The dataset is split into blocks with :func:`~riogrande.prepare.create_views` and each block is processed by :func:`~riogrande.parallel.process_masks` in a :class:`multiprocessing.pool.Pool` obtained via :func:`~riogrande.helper.get_or_set_context`. Results are assembled by :func:`~riogrande.parallel.fill_matrix`. :param bands: A collection of :class:`~riogrande.io.models.Band` objects specifying which bands to use. :type bands: Band :param block_size: Size (width, height) in #pixel of the block that a single job processes :type block_size: tuple[int, int] :param extra_masking_band: Optional :class:`~riogrande.io.models.Band` object that is treated as a rasterio mask, i.e. values equal to 0 will be masked. :type extra_masking_band: Band or None :param verbose: Print out processing step infos :type verbose: bool :param \*\*params: Optional arguments for the multiprocessing: - ``nbrcpu`` : int The number of CPUs to use, passed to :func:`~riogrande.helper.get_nbr_workers`. - ``start_method`` : str Starting method for multiprocessing jobs, passed to :func:`~riogrande.helper.get_or_set_context`. :type \*\*params: dict :returns: A boolean :class:`numpy.ndarray` that can be used as selector. :rtype: NDArray .. seealso:: :func:`~riogrande.parallel.compute_mask` Analogous function that writes the mask to file. :func:`~riogrande.helper.aggregated_selector` Per-block aggregation function used internally. .. py:function:: process_block(task, source, bands, view, task_params, read_params, open_params, out_q) Processes a section of the data in the source file. This is a general purpose function that can be used to process a large .tif in a parallelized manner. The view is converted to a :class:`rasterio.windows.Window` via :func:`~riogrande.helper.view_to_window`, and the result is enqueued using :func:`~riogrande.parallel.runner_call`. :param task: Function that will be called on the data from the specified band. The first argument of the function must be `data`, a :class:`numpy.ndarray` that holds the data from this section. :type task: Callable :param source: Either a string or a :class:`~riogrande.io.models.Source` object. :type source: str or Source :param bands: A collection of :class:`~riogrande.io.models.Band` objects specifying which bands to use. :type bands: Collection[Band] or None :param view: A tuple (x, y, width, height) defining the view of data to extract and process. :type view: tuple[int, int, int, int] :param task_params: Keyword arguments that will be passed to the callable `task` :type task_params: dict :param read_params: Keyword arguments that are passed to the open method of the `source` object :type read_params: dict :param open_params: Keyword arguments that are passed to the reader method of the `source` object :type open_params: dict :param out_q: The queue this job will put the output of the callable `task` into :type out_q: Queue :returns: Can report the duration of the task. :rtype: :class:`~riogrande.timing.TimedTask` .. seealso:: :func:`~riogrande.parallel.process_masks` Analogous function operating on band masks. :func:`~riogrande.parallel.runner_call` Helper that enqueues the callback result. .. py:function:: process_masks(task, bands, view, task_params, read_params, open_params, aggr_q, extra_masking_band = None) Processes a section of the mask for each band This is a general purpose function that can be used to process a large .tif in a parallelized manner. The view is converted to a :class:`rasterio.windows.Window` via :func:`~riogrande.helper.view_to_window`, and the result is enqueued using :func:`~riogrande.parallel.runner_call`. :param task: Function that will be called on the data from the specified band. The first argument of the function must be `masks`, a list of :class:`numpy.ndarray` holding the masks from this section. :type task: Callable :param bands: A collection of :class:`~riogrande.io.models.Band` objects specifying which bands to use. Their mask readers are obtained via :meth:`~riogrande.io.models.Band.get_mask_reader`. :type bands: Collection[Band] :param view: A tuple (x, y, width, height) defining the view of data to extract and process. :type view: tuple[int, int, int, int] :param task_params: Keyword arguments that will be passed to the callable `task` :type task_params: dict :param read_params: Keyword arguments that are passed to the open method of the `source` object :type read_params: dict :param open_params: Keyword arguments that are passed to the reader method of the `source` object :type open_params: dict :param aggr_q: The queue this job will put the output of the callable `task` into :type aggr_q: Queue :param extra_masking_band: Optional :class:`~riogrande.io.models.Band` object that is treated as a rasterio mask, i.e. values equal to 0 are masked. .. warning:: This Band is treated as a mask itself, its own mask is ignored. :type extra_masking_band: Band or None :returns: Can report the duration of the task. :rtype: :class:`~riogrande.timing.TimedTask` .. seealso:: :func:`~riogrande.parallel.process_block` Analogous function operating on band data. :func:`~riogrande.parallel.runner_call` Helper that enqueues the callback result. .. py:function:: runner_call(queue, callback, params, wrapper = None) Put the results of callback using parameter into the queue The function calls ``callback(**params)``, and optionally passes the result through a wrapper function if provided. :param queue: A :class:`multiprocessing.Queue` into which the result (wrapped or unwrapped) will be placed. :type queue: Queue[Any] :param callback: A callable object (function or method) to be executed. :type callback: Callable :param params: A dictionary of keyword arguments passed to the callback. :type params: dict :param wrapper: A function applied to the callback result before it is placed into the queue. If ``None``, the raw result is placed into the queue. :type wrapper: Callable or None :returns: The unwrapped output. :rtype: dict .. seealso:: :func:`~riogrande.parallel.process_block` Uses this function to enqueue block results. :func:`~riogrande.parallel.process_masks` Uses this function to enqueue mask results.