"""
This module contains various helper functions to parallelize the application
of filters on a tif
"""
from __future__ import annotations
import warnings
import numpy as np
import rasterio as rio
from copy import copy
from typing import Union
from collections.abc import Callable, Collection
from numpy.typing import NDArray
from multiprocessing import (Queue, Manager)
from riogrande.io import Source, Band, write_band
from riogrande.helper import (
view_to_window,
output_filename,
get_or_set_context,
get_nbr_workers
)
from riogrande.timing import TimedTask
from riogrande.prepare import create_views
from riogrande.parallel import runner_call
from .processing import (
view_blurred,
view_entropy,
_view_filtered,
view_interaction
)
from .filters.gaussian import compatible_border_size
def _combine_blurred_categories(output_params: dict, blur_q: Queue) -> TimedTask:
"""
Listen to a queue for blurred category blocks and write them to an output file.
This function continuously listens to a queue of blurred category data blocks
and writes each block to the specified output file. Each item received from
the queue represents a spatial region containing multiple categorical bands.
The function initializes the output raster according to the provided
parameters and writes each band sequentially until a termination signal is
received.
Parameters
----------
output_params :
Configuration parameters for the output file. Expected keys include:
- **as_dtype** : str or numpy.dtype
The data type to cast written arrays to before saving.
- **output_file** : str
Path to the output file to be written.
- **profile** : dict
Raster I/O profile used for file creation (e.g., driver, width, height).
- **nodata** : int or float, optional
Value used to represent nodata in the output. If not provided,
inherits from the profile or defaults to ``None``.
- **count** : int, optional
Number of bands in the output raster. If provided, overrides
``profile["count"]``.
- **bigtiff** : bool, optional
If ``True``, forces creation of a BigTIFF file. Defaults to ``False``.
- **verbose** : bool, optional
If ``True``, print progress and debug messages during execution.
Defaults to ``False``.
blur_q : Queue
A multiprocessing or threading queue providing blurred data blocks.
Each item in the queue is expected to be a dictionary with the following
keys:
- **signal** : str, optional
Control signal (e.g., ``"kill"``) used to terminate processing.
- **data** : dict[str, numpy.ndarray]
Mapping of category names (band identifiers) to data arrays.
- **view** : dict
Metadata describing the spatial region corresponding to the data block
(used to compute the write window).
Returns
-------
TimedTask
A `TimedTask` instance tracking the duration of the operation.
This can be used for profiling or performance analysis.
Notes
-----
The function blocks while waiting for items in the queue and terminates
gracefully when an item with ``signal="kill"`` is received. Proper queue
management is essential to avoid deadlocks or unfinished writes.
:func:`~riogrande.io.write_band` is called for each band to handle writing
and metadata tagging. Each output band is named ``LC_<band>`` for clarity.
See Also
--------
:func:`_block_category_extraction` : Produce blurred category blocks for this queue.
:func:`extract_categories` : Top-level function orchestrating category extraction.
"""
with TimedTask() as timer:
as_dtype = output_params.pop('as_dtype')
if isinstance(as_dtype, str):
as_dtype = np.dtype(as_dtype)
output_file = output_params.pop('output_file')
# print(f"{output_file=}")
# print(f"{dtype=}")
profile = output_params.pop('profile')
profile['dtype'] = as_dtype
# overwrite the profile if explicitely provided:
profile['nodata'] = output_params.pop(
'nodata', profile.get('nodata', None))
profile['count'] = output_params.get('count', profile['count'])
# check for Bigtiff
if output_params.pop('bigtiff', False):
profile['BIGTIFF'] = 'YES'
verbose = output_params.get('verbose', False)
with rio.open(output_file, 'w', **profile) as dst:
while True:
output = blur_q.get()
signal = output.get('signal', None)
if signal:
if signal == "kill":
# print(f"\n\nClosing: {output_file}\n\n")
break
categories_data = output.pop('data')
inner_view = copy(output.pop('view'))
# load block tif
# get the relevant block (i.e. remove borders)
# write to output file
w = view_to_window(inner_view)
for bidx, (band, data) in enumerate(categories_data.items(),
start=1):
# NOTE: downside of this is that we set the tags
# every time, unfortunately, in the FINALIZE_TASK
# we do not have the bidx
write_band(src=dst, bidx=bidx, data=data.astype(as_dtype), window=w,
category=band)
# NOTE: we might want to keep the description unchanged:
dst.set_band_description(bidx, f'LC_{band}')
if verbose:
print(f"Wrote out bands for blurred block {inner_view=}")
timer.new_lab()
# print(f"\n\n########\n\nProfile")
return timer
def _combine_entropy_blocks(output_params: dict, entropy_q: Queue):
"""
Listen to a queue for entropy blocks and write them to an output file.
This function continuously monitors a queue for entropy data blocks,
writing them sequentially to an output file until a termination signal
is received. It initializes the output file based on the provided
configuration and ensures proper cleanup when the process ends.
Parameters
----------
output_params : dict
Configuration parameters for the output file. Expected keys include:
- **output_dtype** : str or numpy.dtype
The data type of the output array.
- **output_file** : str
Path to the file where output data will be written.
- **profile** : dict
Dictionary of metadata or configuration options for the output.
- **out_band** : Band, optional
An existing `Band` object for writing output. If not provided,
a new `Band` will be created and initialized.
entropy_q : Queue
A multiprocessing or threading queue that provides entropy blocks.
Each item in the queue is expected to be a dictionary with the
following keys:
- **signal** : str, optional
Control signal (e.g., ``"kill"``) to terminate the process.
- **data** : numpy.ndarray
The data block to be written to the output.
- **view** : dict
Metadata describing the spatial or logical region corresponding
to the data block (used to compute the output window).
Returns
-------
TimedTask
A `TimedTask` instance tracking the total time taken for the operation.
This object can be used for profiling or performance analysis.
Notes
-----
The function blocks while waiting for new items in the queue. It terminates
gracefully when a dictionary with ``signal="kill"`` is received. Proper
queue management is required to prevent deadlocks or resource leaks.
See Also
--------
:func:`_block_entropy` : Produce entropy blocks for this queue.
:func:`compute_entropy` : Top-level function orchestrating entropy computation.
"""
with TimedTask() as timer:
output_dtype = output_params.pop('output_dtype')
if isinstance(output_dtype, str):
output_dtype = np.dtype(output_dtype)
output_file = output_params.pop('output_file')
# print(f"{output_file=}")
# print(f"{output_dtype=}")
profile = output_params.pop('profile')
profile['dtype'] = output_dtype
out_band = output_params.pop('out_band', None)
if out_band is None:
out_band = Band(source=Source(path=output_file),
bidx=1,
tags=dict(category='entropy'))
# create the file
out_band.init_source(profile=profile)
# write out the tags
out_band.export_tags()
# print(f'INIT:\n{out_band.source=}\n{out_band=}')
# with out_band.source.open(mode='r+', **profile) as dst:
with out_band.data_writer(mode='r+', **profile) as write:
while True:
# load the entropy_q
output = entropy_q.get()
signal = output.get('signal', None)
if signal:
if signal == "kill":
print(f"\n\nClosing: {out_band.source.path}\n\n")
break
data = output.pop('data')
view = copy(output.pop('view'))
w = view_to_window(view)
write(data, window=w)
# print(f"Wrote out entropy block {view=}")
timer.new_lab()
return timer
def _combine_interaction_blocks(output_params: dict, interaction_q: Queue):
"""
Listen to a queue for interaction blocks and write them to an output file.
This function continuously monitors a queue for interaction data blocks,
writing them sequentially to a specified output file until a termination
signal is received. It initializes the output file according to the
provided configuration parameters and handles tagged metadata output.
Parameters
----------
output_params : dict
Configuration parameters for the output file. Expected keys include:
- **output_dtype** : str or numpy.dtype
The data type of the output array.
- **output_file** : str
Path to the file where output data will be written.
- **profile** : dict
Dictionary of metadata or configuration options for the output file.
- **out_band** : Band, optional
An existing `Band` instance to handle writing. If not provided,
a new `Band` is created using `output_file`.
- **output_tag** : dict, optional
Metadata tags for the output file. Defaults to
``dict(category='interaction')``.
- **verbose** : bool, optional
If ``True``, print progress and debug messages. Defaults to ``False``.
interaction_q : Queue
A multiprocessing or threading queue that provides interaction blocks.
Each item in the queue is expected to be a dictionary with keys:
- **signal** : str, optional
Control signal (e.g., ``"kill"``) to terminate processing.
- **data** : numpy.ndarray
The data block to be written to the output file.
- **view** : dict
Metadata describing the region of the output file corresponding
to the data block (used for calculating the write window).
Returns
-------
TimedTask
A `TimedTask` instance tracking the duration of the operation.
This object can be used for profiling or performance analysis.
Notes
-----
The function blocks while waiting for new items in the queue and terminates
gracefully when a message with ``signal="kill"`` is received.
Proper queue management is required to prevent deadlocks or resource leaks.
See Also
--------
:func:`_block_interaction` : Produce interaction blocks for this queue.
:func:`compute_interaction` : Top-level function orchestrating interaction computation.
"""
with TimedTask() as timer:
output_dtype = output_params.pop('output_dtype')
if isinstance(output_dtype, str):
output_dtype = np.dtype(output_dtype)
output_file = output_params.pop('output_file')
# print(f"{output_file=}")
# print(f"{output_dtype=}")
profile = output_params.pop('profile')
profile['dtype'] = output_dtype
out_band = output_params.pop('out_band', None)
out_tag = output_params.pop('output_tag', dict(category='interaction'))
verbose = output_params.get('verbose', False)
if out_band is None:
out_band = Band(source=Source(path=output_file),
bidx=1,
tags=out_tag)
# create the file
out_band.init_source(profile=profile)
# write out the tags
out_band.export_tags()
# print(f'INIT:\n{out_band.source=}\n{out_band=}')
# with out_band.source.open(mode='r+', **profile) as dst:
with out_band.data_writer(mode='r+', **profile) as write:
while True:
# load the interaction_q
output = interaction_q.get()
signal = output.get('signal', None)
if signal:
if signal == "kill":
if verbose:
print(f"\n\nClosing: {out_band.source.path}\n\n")
break
data = output.pop('data')
view = copy(output.pop('view'))
w = view_to_window(view)
write(data, window=w)
if verbose:
print(f"Wrote out interaction block {view=}")
timer.new_lab()
return timer
def _block_category_extraction(params: dict, blur_q: Queue) -> TimedTask:
"""
Extract and filter category blocks for a specified raster view and push results to a queue.
This function processes a single spatial block (or "view") from a categorical
raster dataset, extracting per-category layers and optionally applying a
spatial filter (e.g., Gaussian blur). The processed block is then pushed to
a multiprocessing queue for downstream aggregation or writing.
Parameters
----------
params : dict
Dictionary containing configuration and input data for processing a single
raster block. Required keys include:
- **source** : str
Path to the source raster file to read from.
- **view** : tuple(int, int, int, int)
Outer window coordinates of the block to process, as
``(x, y, width, height)``.
- **inner_view** : tuple(int, int, int, int)
Usable region of the block (excluding padding or borders).
- **as_dtype** : str or numpy.dtype
Desired data type for the output blurred category arrays.
- **output_range** : tuple(float, float)
Target data range to map values into when converting to
``as_dtype``.
- **categories** : list[str]
List of category names or band identifiers to extract.
- **img_filter** : Callable, optional
A filter function to apply to each category layer (e.g.,
``skimage.filters.gaussian``).
- **filter_params** : dict, optional
Keyword arguments to pass to the filter callable.
- **filter_output_range** : tuple(float, float), optional
Expected numeric range of the filter’s output. If provided,
used to scale or normalize the filtered results.
blur_q : Queue
A multiprocessing or threading queue to which the blurred, multi-band
category maps are pushed. Each queue item will typically be a dictionary
containing processed data arrays and metadata describing the view.
Returns
-------
TimedTask
A `TimedTask` instance tracking the duration of the operation.
Useful for profiling and performance monitoring.
Notes
-----
This function serves as a wrapper that prepares parameters for
:func:`~convster.processing.view_blurred` and dispatches the computation
through :func:`~riogrande.parallel.runner_call`.
It is intended for internal use within a multiprocessing workflow that
coordinates reading, processing, and writing of raster data blocks.
The queue consumer (e.g., :func:`_combine_blurred_categories`) is responsible for
collecting and writing the processed results to disk.
See Also
--------
:func:`_combine_blurred_categories` : Queue consumer that writes blurred category blocks.
:func:`extract_categories` : Top-level function orchestrating category extraction.
"""
with TimedTask() as timer:
# this is only needed for the entropy part below
blur_params = dict(
source=params.get('source'),
view=params.get('view'),
inner_view=params.get('inner_view'),
categories=params.get('categories'),
img_filter=params.get('img_filter'),
filter_params=params.get('filter_params'),
filter_output_range=params.get('filter_output_range'),
output_dtype=params.get('as_dtype'),
output_range=params.get('output_range'),
)
_ = runner_call(
blur_q,
view_blurred,
blur_params
)
return timer
def _block_entropy(params: dict, entropy_q: Queue) -> TimedTask:
"""
Compute per-block heterogeneity measures based on entropy and push results to a queue.
This function computes Shannon entropy for each spatial block (or "view") in a raster
dataset, using pre-blurred or otherwise prepared category data. The resulting entropy
map is pushed into a multiprocessing queue for aggregation or writing.
Parameters
----------
params : dict
Configuration and data required for processing a single raster block.
Must include the following keys:
- **source** : str
Path to the input raster file to process.
- **view** : tuple(int, int, int, int)
Full extent of the block to process, as ``(x, y, width, height)``.
- **inner_view** : tuple(int, int, int, int)
Usable portion of the block (excluding padding or borders).
- **input_bands** : list[Band]
List of `Band` objects providing access to category data for entropy
computation. Each must support ``get_bidx()`` and ``get_data()`` methods.
- **output_dtype** : str or numpy.dtype
Data type for the entropy output (e.g., ``"float32"`` or ``"uint8"``).
- **output_range** : tuple(float, float)
Expected value range for the entropy output, used for normalization.
Optional keys include:
- **entropy_as_ubyte** : bool, default=False
If ``True``, normalize entropy values to the 0–255 range and cast to
unsigned bytes (``uint8``).
- **normed** : bool, default=True
Whether to normalize the category probabilities before computing entropy.
- **max_entropy_categories** : int or None, default=None
Maximum number of categories to consider in entropy normalization.
entropy_q : Queue
A multiprocessing or threading queue used to transmit entropy results.
Each item added to the queue typically includes a dictionary with keys
such as ``'data'`` (entropy array) and ``'view'`` (spatial metadata).
Returns
-------
TimedTask
A `TimedTask` instance tracking the duration of the operation.
Useful for profiling and performance monitoring.
Notes
-----
This function extracts category data from the provided input bands within the
specified window, computes entropy using :func:`~convster.processing.view_entropy`,
and sends the result to ``entropy_q`` via :func:`~riogrande.parallel.runner_call`.
See Also
--------
:func:`_combine_entropy_blocks` : Queue consumer that writes entropy blocks.
:func:`compute_entropy` : Top-level function orchestrating entropy computation.
"""
with TimedTask() as timer:
input_bands = params.pop('input_bands')
blurred_data = dict()
# for the entropy only the inner view is needed
view = params.get('inner_view')
window = view_to_window(view)
for band in input_bands:
bidx = band.get_bidx(match='category')
blurred_data[bidx] = band.get_data(window=window)
output_dtype = params.pop('output_dtype')
output_range = params.pop('output_range')
normed = params.pop('normed', True)
max_entropy_categories = params.pop('max_entropy_categories', None)
entropy_params = dict(
category_arrays=blurred_data,
view=view,
normed=normed,
max_entropy_categories=max_entropy_categories,
output_dtype=output_dtype,
output_range=output_range,
)
# This would return the entropy data
_ = runner_call(
entropy_q,
view_entropy,
entropy_params
)
return timer
def _block_interaction(params: dict, interaction_q: Queue) -> TimedTask:
"""
Compute per-block interaction measures between categories and push results to a queue.
This function processes a single spatial block (or "view") from a raster dataset,
computing interaction measures between categories (e.g., co-occurrence or overlap)
based on input band data. The computed interaction map is pushed to a
multiprocessing queue for aggregation or writing.
Parameters
----------
params : dict
Configuration and data required for processing a single raster block.
Must include the following keys:
- **source** : str
Path to the input raster file to process.
- **view** : tuple(int, int, int, int)
Full extent of the block to process, as ``(x, y, width, height)``.
- **inner_view** : tuple(int, int, int, int)
Usable region of the block (excluding padding or borders).
- **input_bands** : list[Band]
List of `Band` objects providing access to category data.
Each must implement ``get_bidx()`` and ``get_data()`` methods.
- **output_dtype** : str or numpy.dtype
Data type for the interaction output (e.g., ``"float32"`` or ``"uint8"``).
- **output_range** : tuple(float, float)
Expected range of output values used for normalization or scaling.
Optional keys include:
- **input_dtype** : str or numpy.dtype or None, default=None
Data type of the input category arrays, if conversion is required.
- **interaction_as_ubyte** : bool, default=False
If ``True``, normalize the interaction values to the 0–255 range and
cast to unsigned bytes (``uint8``).
- **standardize** : bool, default=False
Whether to standardize category arrays before computing interaction.
- **normed** : bool, default=True
Whether to normalize input data (e.g., probability normalization)
before computing interaction.
interaction_q : Queue
A multiprocessing or threading queue used to transmit interaction results.
Each item in the queue typically includes a dictionary containing the
computed interaction data and associated spatial metadata.
Returns
-------
TimedTask
A `TimedTask` instance tracking the duration of the operation.
Useful for profiling and performance monitoring.
Notes
-----
This function extracts category data from the provided input bands within
the specified window, computes an interaction measure via
:func:`~convster.processing.view_interaction`, and sends the results to
``interaction_q`` using :func:`~riogrande.parallel.runner_call`.
See Also
--------
:func:`_combine_interaction_blocks` : Queue consumer that writes interaction blocks.
:func:`compute_interaction` : Top-level function orchestrating interaction computation.
"""
with TimedTask() as timer:
input_bands = params.pop('input_bands')
blurred_data = dict()
# for the interaction only the inner view is needed
view = params.get('inner_view')
window = view_to_window(view)
for band in input_bands:
bidx = band.get_bidx(match='category')
blurred_data[bidx] = band.get_data(window=window)
input_dtype = params.pop('input_dtype', None)
output_dtype = params.pop('output_dtype')
output_range = params.pop('output_range')
standardize = params.pop('standardize', False)
normed = params.pop('normed', True)
interaction_as_ubyte = params.pop('interaction_as_ubyte', False)
interaction_params = dict(
view=view,
input_dtype=input_dtype,
standardize=standardize,
normed=normed,
category_arrays=blurred_data,
output_dtype=output_dtype,
output_range=output_range,
)
# This would return the interaction data
_ = runner_call(
interaction_q,
view_interaction,
interaction_params
)
return timer
[docs]
def compute_entropy(source: str | Source,
output_file: str,
block_size: tuple[int, int],
blur_params: dict,
categories: list | None = None,
output_dtype: type | str | None = None,
output_range: tuple | None = None,
normed: bool = True,
max_entropy_categories: int | None = None,
verbose: bool = False,
**params):
"""
Compute entropy-based heterogeneity from categorical raster bands.
This function orchestrates a parallel computation of entropy-based
heterogeneity across multiple raster blocks, combining per-block results
into a single output GeoTIFF. Each block is processed independently via
multiprocessing workers that read subsets of the input raster, compute
entropy over selected categorical bands, and write results to a queue
for aggregation.
Parameters
----------
source : str or Source
Path to the categorical raster file (e.g., land-cover map) or an
initialized `Source` object providing access to the dataset.
output_file : str
Path where the final entropy GeoTIFF will be written.
block_size : tuple of int
Block dimensions ``(width, height)`` in pixels. Determines the size
of the chunks processed by each worker.
blur_params : dict
Dictionary of parameters for Gaussian blur or other preprocessing.
Typically includes at least one of ``'sigma'`` or ``'diameter'`` in
spatial units (e.g., meters). Used primarily for output file naming.
categories : list, optional
List of category names or identifiers to include in entropy
computation. If not provided, all available categories in the source
raster are used.
output_dtype : str, type, or None, optional
Desired data type for the output entropy raster. If not provided,
defaults to ``numpy.uint8``.
output_range : tuple of float, optional
Minimum and maximum output values. If not set, the range is inferred
from ``output_dtype`` — [0, 1] for floating-point types, or the
valid range for integer types (e.g., [0, 255] for uint8).
normed : bool, default=True
Whether to normalize the entropy values. When True, entropy is scaled
relative to the maximum possible entropy determined by the number of
categories.
max_entropy_categories : int, optional
Specifies the number of categories to assume for maximum entropy
normalization. Ignored if ``normed=False``.
verbose : bool, default=False
If True, prints progress updates and diagnostic information.
**params
Additional optional parameters controlling multiprocessing and output:
- **nbrcpu** : int
Number of CPU cores to use. Defaults to the number of available
cores minus one.
- **start_method** : str
Multiprocessing start method (e.g., ``"spawn"`` or ``"fork"``).
- **entropy_as_ubyte** : bool, optional
Deprecated. Use ``output_dtype="uint8"`` instead.
- **compress** : bool, default=False
If True, compresses the final GeoTIFF using LZW compression.
Returns
-------
output_file : str
Path to the resulting entropy raster file.
Notes
-----
- Each processing block computes entropy over the given categories using
the internal :func:`_block_entropy` function and sends results to a queue.
- The function :func:`_combine_entropy_blocks` merges these intermediate
results into a single raster file.
- The operation can be parallelized across multiple CPUs to handle
large raster datasets efficiently.
- Block decomposition uses :func:`~riogrande.prepare.create_views`.
- Worker pool is created with :func:`~riogrande.helper.get_or_set_context`
and sized using :func:`~riogrande.helper.get_nbr_workers`.
- Output filename is constructed with :func:`~riogrande.helper.output_filename`.
See Also
--------
:func:`compute_interaction` : Compute spatial interaction between categorical bands.
:func:`extract_categories` : Extract per-category maps with optional filtering.
:func:`_block_entropy` : Worker function computing entropy for a single block.
"""
print(f'compute_entropy - {source=}, {categories=}')
if isinstance(source, str):
source = Source(path=source)
with source.open(mode='r') as src:
width = src.width
height = src.height
profile = copy(src.profile)
if verbose:
print("The chosen source tif has a dimension of:"
f"\n\t{width=}\n\t{height=}\n")
# adapt the profile:
profile['count'] = 1 # single band for entropy
# now let's prepare the output parameters:
if categories is None:
categories = list(source.get_tag_values(tag='category').values())
print('WARNING: Inferring the number of categories to use from the\n'
' source file.')
input_bands = [Band(source=source, tags=dict(category=category))
for category in categories]
if verbose:
band_choice_str = '\n\t'.join(
(f'{band.get_bidx()}:{band.tags}' for band in input_bands))
print("Chosen bands for the entropy calculation:\n"
f"\t{band_choice_str} \n")
entropy_output_file = output_filename(
base_name=output_file,
out_type='entropy',
blur_params=blur_params
)
# handle deprecated parameters
entropy_as_ubyte = params.pop('entropy_as_ubyte', None)
if entropy_as_ubyte is not None:
if entropy_as_ubyte:
output_dtype = "uint8"
else:
output_dtype = "float64"
warnings.warn("The parameter `entropy_as_ubyte` is deprecated, use "
f"`output_dtype` instead!\nUsing "
f"{entropy_as_ubyte=} leads to "
f"{output_dtype=}",
category=DeprecationWarning)
if output_dtype is None:
output_dtype = np.uint8
warnings.warn("No `output_dtype` provided for result!\nUsing "
f"{output_dtype=} default instead")
entropy_output_params = dict(
input_bands=input_bands,
# blur_params=blur_params,
profile=profile,
output_dtype=output_dtype,
output_file=entropy_output_file,
output_tags=dict(category='entropy'),
)
_, inner_views = create_views(view_size=block_size,
border=(0, 0),
size=(width, height))
block_params = []
for inner_view in inner_views:
bparams = dict(input_bands=input_bands,
categories=categories,
inner_view=inner_view,
output_dtype=output_dtype,
output_range=output_range,
normed=normed,
max_entropy_categories=max_entropy_categories, )
block_params.append(bparams)
# ###
# prepare multiprocessing
# ###
manager = Manager()
entropy_q = manager.Queue()
start_method = params.get('start_method', None)
# get number of cpu's
nbr_workers = get_nbr_workers(number=params.pop('nbrcpu', None))
if verbose:
print(f"using {nbr_workers=}")
with get_or_set_context(start_method).Pool(nbr_workers) as pool:
# start the entropy writer task
entropy_combiner = pool.apply_async(_combine_entropy_blocks,
(entropy_output_params, entropy_q))
# start the block processing
all_jobs = []
for bparams in block_params:
all_jobs.append(pool.apply_async(_block_entropy,
(bparams, entropy_q)))
# collect results
job_timers = []
for job in all_jobs:
# await for the jobs to return (i.e. complete) by calling .get()
# get the duration from the timer object that is returned by .get()
job_timers.append(job.get().get_duration())
# once we have all the blocks, add a last element to the queue to stop
# the combination process
entropy_q.put(dict(signal='kill'))
pool.close()
# wait for the *_combiner tasks to finish
pool.join()
# lzw-compress final output
compress = params.pop('compress', False)
if compress:
out_source = Source(entropy_output_file)
out_source.compress(output=None)
entropy_output_file = str(out_source.path)
print("Files compressed successfully")
total_duration = entropy_combiner.get().get_duration()
print(f"{total_duration=}")
print(f"maximal duration of single job: {max(job_timers)=}")
return entropy_output_file
[docs]
def compute_interaction(source: str | Source,
output_file: str,
block_size: tuple[int, int],
blur_params: dict,
categories: list | None = None,
output_dtype: type | str | None = None,
output_range: tuple | None = None,
standardize: bool = False,
normed: bool = True,
verbose: bool = False,
**params):
"""
Compute interaction strength between categorical raster bands.
This function computes pairwise, three-way, or higher-order interaction
measures among categorical raster bands (e.g., land-cover classes).
The computation is performed block-wise using multiprocessing, where
each worker processes a subset of the raster and pushes intermediate
results to a queue. The results are then combined into a single output
raster representing spatial interaction strength.
Parameters
----------
source : str or Source
Path to the categorical raster file (e.g., a land-cover map) or an
initialized `Source` object providing access to the dataset.
output_file : str
Path where the final interaction raster will be saved.
block_size : tuple of int
Block dimensions ``(width, height)`` in pixels that define the size
of the chunks processed by each multiprocessing worker.
blur_params : dict
Dictionary containing parameters for Gaussian blur or other
preprocessing. Used primarily for formatting the output filename.
Should include either ``'sigma'`` or ``'diameter'`` in spatial units.
categories : list, optional
List of categories (e.g., land-cover types) to include in the
interaction computation. If not provided, all categories available
in the source raster are used.
output_dtype : str, type, or None, optional
Desired data type for the output raster. If not provided, defaults
to ``numpy.uint8``.
output_range : tuple of float, optional
Minimum and maximum values for scaling the output. If not specified,
inferred automatically from ``output_dtype``—[0, 1] for floats, or
the valid integer range for integer types (e.g., [0, 255] for uint8).
standardize : bool, default=False
Whether to standardize the data prior to computing interactions.
This can improve comparability across categories with different
magnitudes or distributions.
normed : bool, default=True
If True, normalizes the computed interaction strengths relative to
their theoretical maximum values.
verbose : bool, default=False
If True, prints detailed progress and diagnostic messages during
processing.
**params
Additional optional parameters controlling multiprocessing and output:
- **nbrcpu** : int
Number of CPU cores to use. Defaults to the number of available
cores minus one.
- **start_method** : str
Multiprocessing start method (e.g., ``"spawn"`` or ``"fork"``).
- **interaction_as_ubyte** : bool, optional
Deprecated. Use ``output_dtype="uint8"`` instead.
- **compress** : bool, default=False
If True, compresses the final GeoTIFF using LZW compression.
Returns
-------
output_file : str
Path to the resulting interaction raster file.
Notes
-----
- The computation proceeds in parallel by dividing the raster into
independent processing blocks.
- Each worker executes an internal :func:`_block_interaction` task and writes
its results to a multiprocessing queue.
- The :func:`_combine_interaction_blocks` function merges all block results
into a single output file.
- Designed for efficient large-scale raster analysis on multicore systems.
- Block decomposition uses :func:`~riogrande.prepare.create_views`.
- Worker pool is created with :func:`~riogrande.helper.get_or_set_context`
and sized using :func:`~riogrande.helper.get_nbr_workers`.
- Output filename is constructed with :func:`~riogrande.helper.output_filename`.
See Also
--------
:func:`compute_entropy` : Compute spatial entropy of categorical bands.
:func:`extract_categories` : Extract per-category maps with optional filtering.
:func:`_block_interaction` : Worker function computing interaction for a single block.
"""
if isinstance(source, str):
source = Source(path=source)
with source.open(mode='r') as src:
width = src.width
height = src.height
profile = copy(src.profile)
input_dtype = np.dtype(profile['dtype'])
if verbose:
print("The chosen source tif has a dimension of:"
f"\n\t{width=}\n\t{height=}\n")
# adapt the profile:
profile['count'] = 1 # single band for interaction
# now let's prepare the output parameters:
if categories is None:
categories = list(source.get_tag_values(tag='category').values())
print('WARNING: Inferring the number of categories to use from the\n'
' source file.')
input_bands = [Band(source=source, tags=dict(category=category))
for category in categories]
if verbose:
band_choice_str = '\n\t'.join(
(f'{band.get_bidx()}:{band.tags}' for band in input_bands))
print("Chosen bands for the interaction calculation:\n"
f"\t{band_choice_str} \n")
interaction_output_file = output_filename(
base_name=output_file,
out_type='interaction',
blur_params=blur_params
)
# handle deprecated parameters
interaction_as_ubyte = params.pop('interaction_as_ubyte', None)
if interaction_as_ubyte is not None:
if interaction_as_ubyte:
output_dtype = "uint8"
else:
output_dtype = "float64"
warnings.warn("The parameter `interaction_as_ubyte` is deprecated, use "
f"`output_dtype` instead!\nUsing "
f"{interaction_as_ubyte=} leads to "
f"{output_dtype=}",
category=DeprecationWarning)
if output_dtype is None:
output_dtype = np.uint8
warnings.warn("No `output_dtype` provided for result!\nUsing "
f"{output_dtype=} default instead")
interaction_output_params = dict(
input_bands=input_bands,
profile=profile,
output_dtype=output_dtype,
output_file=interaction_output_file,
output_tags=dict(category='interaction'),
)
_, inner_views = create_views(view_size=block_size,
border=(0, 0),
size=(width, height))
block_params = []
for inner_view in inner_views:
bparams = dict(input_bands=input_bands,
categories=categories,
input_dtype=input_dtype,
inner_view=inner_view,
output_dtype=output_dtype,
output_range=output_range,
standardize=standardize,
normed=normed, )
block_params.append(bparams)
# ###
# prepare multiprocessing
# ###
manager = Manager()
interaction_q = manager.Queue()
start_method = params.get('start_method', None)
# get number of workers
nbr_workers = get_nbr_workers(number=params.pop('nbrcpu', None))
if verbose:
print(f"using {nbr_workers=}")
with get_or_set_context(start_method).Pool(nbr_workers) as pool:
# start the interaction writer task
interaction_combiner = pool.apply_async(_combine_interaction_blocks,
(interaction_output_params, interaction_q))
# start the block processing
all_jobs = []
for bparams in block_params:
all_jobs.append(pool.apply_async(_block_interaction,
(bparams, interaction_q)))
# collect results
job_timers = []
for job in all_jobs:
# await for the jobs to return (i.e. complete) by calling get()
# get the duration from the timer object that is returned by get()
job_timers.append(job.get().get_duration())
# once we have all the blocks, add a last element to the queue to stop
# the combination process
interaction_q.put(dict(signal='kill'))
pool.close()
# wait for the *_combiner tasks to finish
pool.join()
# lzw-compress final output
compress = params.pop('compress', False)
if compress:
out_source = Source(interaction_output_file)
out_source.compress(output=None)
interaction_output_file = str(out_source.path)
print("Files compressed successfully")
total_duration = interaction_combiner.get().get_duration()
print(f"{total_duration=}")
print(f"maximal duration of single job: {max(job_timers)=}")
return interaction_output_file
[docs]
def apply_filter(source: str | Source,
output_file: str,
block_size: tuple[int, int],
bands: list[Band] | None = None,
data_in_range: None | NDArray | Collection = None,
data_as_dtype: type | None = np.uint8,
data_output_range: None | NDArray | Collection = None,
replace_nan_with: Union[int, float] | None = None,
img_filter=None,
filter_params: dict | None = None,
filter_output_range: Collection | None = (0., 1.),
output_dtype: type | None = np.uint8,
output_range: tuple | None = None,
selector_band: Band | None = None,
verbose: bool = False,
output_params: None | dict = None,
**params
) -> str:
"""
Apply a filter to one or more bands of a raster and export the result.
This function processes the raster in blocks to allow for memory-efficient
and parallelized computation. Each block is filtered and then combined into
a single output raster. Optionally, categorical masking can be used with
`selector_band`.
Parameters
----------
source : str or Source
Path to the input raster file or a `Source` object.
output_file : str
Path where the filtered raster will be written.
block_size : tuple of int
Size of the processing blocks in pixels as ``(width, height)``.
bands : list of Band, optional
Specific bands to process. If None, all bands in the raster are used.
data_in_range : array-like, optional
Input range used for rescaling loaded data before filtering.
data_as_dtype : type, optional, default=np.uint8
Data type to which input data is converted before filtering.
data_output_range : array-like, optional
Output range for data rescaling after conversion to `data_as_dtype`.
replace_nan_with : int or float, optional
Value used to replace NaNs in the input before filtering.
img_filter : callable, optional
Filter function to apply to the data (e.g., `skimage.filters.gaussian`).
filter_params : dict, optional
Parameters to pass to `img_filter`.
filter_output_range : collection, default=(0., 1.)
Expected output range of the applied filter function.
output_dtype : type, optional, default=np.uint8
Data type of the filtered output raster.
output_range : tuple, optional
Value range to rescale the final output raster.
selector_band : Band, optional
Categorical band used as a mask to apply the filter
selectively across categories.
output_params : dict, optional
Additional output configuration:
- **as_dtype** : data type for filtered output (overrides `output_dtype`)
- **nodata** : value for missing data (default: None)
- **bigtiff** : bool, whether to create a BIGTIFF for >4GB files
.. note:: This may become standard at a later point
verbose : bool, default=False
Print progress and debug information.
**params
Additional optional arguments:
- **nbrcpu** : int, number of CPU cores for multiprocessing.
- **start_method** : str, multiprocessing start method.
- **compress** : bool, whether to compress the final output with LZW.
Returns
-------
output_file : str
Path to the resulting filtered raster file.
Notes
-----
- Deprecated parameters (`output_dtype` vs `output_params['as_dtype']`) are internally handled with warnings.
- Borders for filtering are automatically computed from the filter kernel
using :func:`~convster.filters.gaussian.compatible_border_size`.
- Block decomposition uses :func:`~riogrande.prepare.create_views`.
- Worker pool is created with :func:`~riogrande.helper.get_or_set_context`
and sized using :func:`~riogrande.helper.get_nbr_workers`.
- Each block is processed by :func:`~convster.processing._view_filtered`
via :func:`~riogrande.parallel.runner_call`.
See Also
--------
:func:`extract_categories` : Extract per-category maps with optional filtering.
:func:`compute_entropy` : Compute spatial entropy of categorical bands.
"""
if isinstance(source, str):
source = Source(path=source)
with source.open() as src:
width = src.width
height = src.height
if verbose:
print("The chosen source tif has a dimension of:"
f"\n\t{width=}\n\t{height=}")
print(f"The block size without border is {block_size=} pixels")
if bands is None:
bands = source.get_bands()
else:
sources = set()
sources.add(source)
for band in bands:
sources.add(band.source)
assert len(sources) == 1, "Only bands with the same source are " \
f"allowed!\nWe have\n\t{sources=}"
source = sources.pop()
# prepare output params
if output_params is None:
output_params = dict()
_out_dtype = output_params.get('as_dtype', None)
if _out_dtype is not None and _out_dtype != output_dtype:
raise ValueError(
"Provided two different output dtypes:"
f"{output_params['as_dtype']=} and {output_dtype=}"
)
output_params['as_dtype'] = output_dtype
# we pass indexes
indexes = [band.get_bidx() for band in bands]
profile = source.import_profile()
border = compatible_border_size(**filter_params)
if verbose:
print(f"The resulting border size is {border=} pixels")
# we need to set nodata to None if input is different dtype than output and input has Nodata e.g. nan to unit8
output_params['nodata'] = None
# set the parameter for the resulting file
blur_output_params = dict(
profile=profile,
count=len(indexes),
output_file=output_file,
)
# use directly the `output_params` arg:
blur_output_params.update(output_params)
views, inner_views = create_views(view_size=block_size,
border=border,
size=(width, height))
block_params = []
# The parameter for the filter we want to apply:
for view, inner_view in zip(views, inner_views):
bparams = dict(source=str(source.path),
bands=bands,
view=view,
inner_view=inner_view,
data_in_range=data_in_range,
data_as_dtype=data_as_dtype,
data_output_range=data_output_range,
replace_nan_with=replace_nan_with,
img_filter=img_filter,
filter_params=filter_params,
filter_output_range=filter_output_range,
as_dtype=output_dtype,
output_range=output_range,
selector_band=selector_band,
)
block_params.append(bparams)
# ###
# prepare multiprocessing
# ###
manager = Manager()
blur_q = manager.Queue()
# get number of cpu's
nbr_workers = get_nbr_workers(number=params.pop('nbrcpu', None))
if verbose:
print(f"using {nbr_workers=}")
start_method = params.get('start_method', None)
with get_or_set_context(start_method).Pool(nbr_workers) as pool:
# start the blurred category writer task
blur_combiner = pool.apply_async(
func=_combine_blurred_categories,
kwds=dict(output_params=blur_output_params, blur_q=blur_q)
)
# start the block processing
all_jobs = []
for bparams in block_params:
all_jobs.append(pool.apply_async(
func=runner_call,
kwds=dict(queue=blur_q,
callback=_view_filtered,
params=bparams)
))
# collect results
job_outputs = []
for job in all_jobs:
# await for the jobs to return (i.e. complete) by calling .get()
# get the duration from the timer object that is returned by .get()
job_outputs.append(job.get())
# once we have all the blocks, add a last element to the queue to stop
# the combination process
blur_q.put(dict(signal='kill'))
pool.close()
# wait for the *_combiner tasks to finish
pool.join()
total_duration = blur_combiner.get().get_duration()
print(f"{total_duration=}")
# lzw-compress final output
compress = params.pop('compress', False)
if compress:
out_source = Source(output_file)
out_source.compress(output=None)
output_file = str(out_source.path)
print("Files compressed successfully")
return output_file