"""
Data preparation and derived metric computation for categorical raster maps.
This module provides functions for extracting, filtering, and transforming
data from categorical raster maps, as well as for computing per-cell derived
metrics. It is the primary processing layer of the ``convster`` package.
Key functionality includes:
- **Category handling**: Selecting pixels by category value, listing available
categories, and extracting per-category data arrays (:func:`select_category`,
:func:`get_categories`, :func:`get_category_data`).
- **Filter application**: Applying arbitrary image filters per category with
optional output rescaling (:func:`get_filtered_categories`,
:func:`get_category_data`).
- **Entropy computation**: Computing per-cell Shannon entropy across category
probability layers (:func:`compute_entropy`, :func:`get_max_entropy`,
:func:`get_entropy_view`).
- **Interaction metrics**: Computing per-cell multi-layer interaction values
normalised by the theoretical maximum (:func:`view_interaction`).
- **Visualisation helpers**: Generating entropy and interaction arrays ready
for display or export (:func:`view_entropy`, :func:`view_interaction`).
"""
from __future__ import annotations
from collections.abc import Callable, Collection, Sequence
import warnings
import numpy as np
from numpy.typing import NDArray
from scipy.stats import entropy
from skimage.filters import gaussian
from riogrande.helper import dtype_range, convert_to_dtype
from riogrande.io import Source, Band, load_block
from riogrande.prepare import get_view, relative_view
from .filters import bpgaussian
[docs]
def select_category(data: NDArray, category: int | list[int],
as_dtype: type | str = "uint8", limits: tuple | None = None) -> NDArray:
"""
Filter an array for particular category or categories.
Parameters
----------
data : NDArray
Input matrix of integers indicating the category of each pixel.
category : int or list[int]
The category (or list of categories) to select.
as_dtype : type or str
Data type of the output matrix.
.. note::
The output matrix will contain the maximal value possible for this
data type in cells that match `category`, and the minimal value
in all other cells.
limits : tuple or None
Custom limits for output values. Must be a pair `(is_value, is_not_value)`.
If provided, these override the default min/max values inferred from
`as_dtype`.
Returns
-------
NDArray
Matrix of type `as_dtype` with the same shape as `data`.
See Also
--------
:func:`get_categories` : Infer the list of categories from an array.
:func:`get_category_data` : Extract and optionally filter data for one or more categories.
Examples
--------
>>> data = np.array([
... [0, 1, 2],
... [2, 1, 0],
... [1, 0, 2]
... ])
>>> select_category(data, category=1)
array([[ 0, 255, 0],
[ 0, 255, 0],
[255, 0, 0]], dtype=uint8)
>>> select_category(data, category=[1, 2], as_dtype="int16", limits=(1000, -1000))
array([[-1000, 1000, 1000],
[ 1000, 1000, -1000],
[ 1000, -1000, 1000]], dtype=int16)
"""
if isinstance(as_dtype, str):
_as_dtype = np.dtype(as_dtype)
else:
_as_dtype = as_dtype
if limits:
_is, _is_not = limits
else:
_is, _is_not = map(lambda x: np.array(x).astype(
_as_dtype), dtype_range(_as_dtype))
if isinstance(category, int):
_selected = [category, ]
else:
_selected = category
return np.where(np.isin(data, _selected), _is, _is_not)
[docs]
def get_categories(data: NDArray) -> list[int]:
"""
Return the sorted list of categories present in the data.
Parameters
----------
data : NDArray
Array of integers indicating the category of each pixel.
Returns
-------
list of int
Sorted list of unique categories present in the data.
Uses :func:`numpy.unique` to determine unique values.
See Also
--------
:func:`select_category` : Filter an array for a specific category.
:func:`get_filtered_categories` : Extract all categories into separate arrays.
Examples
--------
>>> a = np.array([[0, 1, 2],
... [2, 1, 0],
... [1, 0, 2]])
>>> get_categories(a)
[0, 1, 2]
"""
categories = np.unique(data)
categories.sort()
print("Inferring the number of categories from the provided data."
f"\nGot:\t{categories}")
return list(map(int, categories))
[docs]
def get_category_data(data: NDArray,
category: int | list[int],
img_filter: Callable | None = None,
filter_params: dict | None = None,
filter_output_range: tuple | None = None,
as_dtype: type | str | None = None,
output_range: tuple | None = None,
data_as_dtype: type | str = "uint8") -> NDArray:
"""
Return the data of one or more categories, optionally after applying a filter.
Parameters
----------
data : NDArray
Matrix indicating the per-cell category.
category : int or list[int]
The category (or categories) to extract.
img_filter : Callable or None
Filter function applied to the selected category data (e.g.,
``skimage.filters.gaussian``).
filter_params : dict or None
Parameters passed to ``img_filter``.
filter_output_range : tuple or None
Output value range to apply after filtering, if applicable.
as_dtype : type or str or None
Desired data type of the output array.
output_range : tuple or None
Custom value range ``(min, max)`` to which the output will be scaled.
Useful when filters produce floating-point values.
For example, a Gaussian filter returns a ``float64`` array with
values in ``[0, 1]``. With ``as_dtype="uint8"``, these values are
mapped to ``[0, 255]``, reducing memory usage.
data_as_dtype : type or str
Data type of the array used to encode the category mask before filtering.
Default is ``"uint8"``. For datasets with more than 255 categories,
``"uint16"`` may be more appropriate.
Returns
-------
Filtered or unfiltered array of the selected category, converted and
scaled according to the specified options.
Notes
-----
- If no image filter is provided, either ``as_dtype`` or ``output_range``
must be set to define the data type or range of the output array.
- If an image filter is provided, ``as_dtype`` converts the data before
the filter is applied.
See Also
--------
:func:`select_category` : Create a binary indicator array for a category.
:func:`get_filtered_categories` : Apply this function across all categories.
:func:`_filter_data` : Apply a filter and rescale the resulting data.
"""
# strip the category/categories
_data = select_category(data, category, as_dtype=data_as_dtype)
filter_params = filter_params or dict()
# apply the image filter if provided
_data = _filter_data(data=_data,
img_filter=img_filter,
filter_params=filter_params,
filter_output_range=filter_output_range,
as_dtype=as_dtype,
output_range=output_range)
return _data
[docs]
def get_filtered_categories(data: NDArray,
categories: None | Collection = None,
img_filter: None | Callable = None,
output_dtype: type | str | None = "uint8",
output_range: tuple | None = None,
filter_output_range: tuple | None = None,
filter_params: dict | None = None) -> dict[int, NDArray]:
"""
Extract each category from a data array into separate arrays and optionally apply a filter.
Parameters
----------
data : NDArray
Array containing integer categories, e.g., a land-cover type matrix.
categories : Collection or None
Collection of categories to extract. If None, all categories in `data` are extracted.
img_filter : Callable or None
Callable to apply as a filter to each category array (e.g., `skimage.filters.gaussian`).
output_dtype : type or str or None
Data type for the returned arrays (default: "uint8").
output_range : tuple or None
Range to rescale the filtered arrays.
filter_output_range : tuple or None
Expected output range of the filter for proper scaling.
filter_params : dict or None
Dictionary of parameters to pass to the filter function.
Returns
-------
dict
A dictionary mapping each category to its filtered and optionally rescaled array.
Notes
-----
- See :func:`get_category_data` for details on extracting category-specific data.
See Also
--------
:func:`get_category_data` : Extract and optionally filter data for one category.
:func:`get_categories` : Infer the list of categories from an array.
"""
if categories is None:
categories = get_categories(data)
all_categories = dict()
for category in categories:
_data = get_category_data(data=data, category=category,
img_filter=img_filter,
filter_params=filter_params,
filter_output_range=filter_output_range,
as_dtype=output_dtype,
output_range=output_range,
)
all_categories[category] = _data
return all_categories
[docs]
def get_max_entropy(nbr: int) -> float:
"""
Maximum entropy value for a given number of categories.
The maximum Shannon entropy occurs when the distribution is uniform
across `nbr` categories, i.e. all categories have equal probability.
Parameters
----------
nbr : int
The number of categories.
Returns
-------
float
The maximal entropy for a uniform distribution with `nbr` categories.
Computed using :func:`scipy.stats.entropy` with a uniform distribution.
See Also
--------
:func:`compute_entropy` : Compute per-cell entropy over a series of data arrays.
Examples
--------
>>> get_max_entropy(2)
0.6931471805599453
>>> get_max_entropy(10)
2.302585092994046
"""
return entropy(np.ones(nbr))
[docs]
def compute_entropy(data_arrays: Sequence[NDArray],
normed: bool = True,
max_entropy_categories: int | None = None,
as_dtype: type | str | None = None,
output_range: tuple | None = None,
**entropy_params) -> NDArray:
"""
Compute per-cell entropy over a series of data arrays.
The input arrays are stacked along a new axis, and entropy is calculated for each cell.
The resulting array can be normalized, converted to a different dtype, and rescaled
to a specified output range.
Parameters
----------
data_arrays : Sequence[NDArray]
Sequence of arrays to compute per-cell entropy over. All arrays must have the same shape.
normed : bool
If True (default), entropy values are normalized according to the maximum possible entropy.
If False, the raw entropy is returned without rescaling.
max_entropy_categories : int or None
Maximum number of categories to use for normalization when `normed=True`.
Ignored if `normed=False`.
as_dtype : type or str or None
Data type for the output array. Useful to reduce memory usage when `normed=True`.
output_range : tuple or None
Range to rescale normalized entropy values. Ignored if `normed=False`.
**entropy_params : dict
Additional keyword arguments passed to :func:`scipy.stats.entropy`.
Returns
-------
NDArray
Array of the same shape as the input arrays, containing the per-cell entropy.
Notes
-----
- When `normed=True`, the entropy is mapped to [0, 1] for float outputs by default,
or to the full range of the specified integer type if `as_dtype` is integer.
- Converting to a different dtype without normalization may produce unbounded results.
- For large arrays, using a smaller `as_dtype` (e.g., 'uint8') can save memory.
- Normalization uses :func:`get_max_entropy` to determine the maximum entropy given
the number of input arrays, and :func:`~riogrande.helper.convert_to_dtype` for
rescaling.
See Also
--------
:func:`get_max_entropy` : Compute the maximum entropy for a given number of categories.
:func:`_get_entropy` : Internal wrapper combining blurring and entropy computation.
Examples
--------
>>> data1 = np.array([[10, 5],
... [4, 1]])
>>> data2 = np.array([[1, 5],
... [2, 9]])
>>> compute_entropy([data1, data2], normed=True, as_dtype='float32')
array([[0.439497 , 1. ],
[0.91829586, 0.4689956 ]], dtype=float32)
"""
# calculate the entropy
_stacked = np.stack(data_arrays, axis=2)
entropy_array = entropy(_stacked, axis=2, **entropy_params)
in_dtype = entropy_array.dtype
if isinstance(as_dtype, str):
_as_dtype = np.dtype(as_dtype)
else:
_as_dtype = as_dtype
if normed:
if max_entropy_categories is None:
max_entropy = get_max_entropy(len(data_arrays))
else:
max_entropy = get_max_entropy(max_entropy_categories)
# We normalize the entropy by setting the in_range accordingly
if np.issubdtype(_as_dtype, np.floating) and output_range is None:
# use the normalization range [0, 1] for float output by default
output_range = (0.0, 1.0)
elif np.issubdtype(_as_dtype, np.integer) and output_range is None:
# use the general possible range for Integers
_intmax, _intmin = dtype_range(_as_dtype)
output_range = (_intmin, _intmax)
input_range = [0.0, max_entropy],
else:
if output_range is not None:
warnings.warn(
f"Calculating the entropy with {normed=} ignores "
f"{output_range=} as a non-normalized entropy value is not "
"bounded and can thus not be mapped to a data range."
)
if as_dtype is not None and str(in_dtype) != str(as_dtype):
# we do not normalize but convert
warnings.warn(
f"The computed entropy will be converted from {str(in_dtype)} "
f"to {as_dtype} without rescaling. If this is not what you "
"want set the `output_range` parameter."
)
output_range = None
input_range = None
# convert (and rescale if normed)
entropy_array = convert_to_dtype(data=entropy_array,
as_dtype=_as_dtype,
in_range=input_range,
out_range=output_range)
return entropy_array
def _get_entropy(data: NDArray,
categories: Collection | None = None,
normed: bool = False,
max_entropy_categories: int | None = None,
img_filter: Callable | None = None,
as_dtype: type | str | None = None,
output_range: tuple | None = None,
filter_params: dict | None = None,
entropy_params: dict | None = None,
filter_output_range: tuple | None = None,
**params) -> NDArray:
# NOTE: This function is only used for testing purposes.
"""
Compute the Shannon entropy per cell from a 2D categorical array.
This method performs first a gaussian blurring, followed by a per-cell
entropy calculation.
Parameters
----------
data : NDArray
2D array of integer categorical values.
categories : Collection or None
Collection of categories to extract. If None, all categories in `data` are used.
normed : bool
If True, entropy values are normalized by the maximum possible entropy
given the number of categories.
max_entropy_categories : int or None
Maximum number of categories to use for normalization. Ignored if `normed=False`.
img_filter : Callable or None
Filter function applied to the per-category arrays, e.g., `skimage.filters.gaussian`.
filter_params : dict or None
Parameters to pass to `img_filter`.
filter_output_range : tuple or None
Expected output range of the filter for proper scaling.
as_dtype : type or str or None
Output data type for the entropy array. Only applied if `normed=True`.
output_range : tuple or None
Range to rescale normalized entropy values. Only applied if `normed=True`.
entropy_params : dict or None
Additional keyword arguments passed to `compute_entropy`.
**params : dict
Additional arguments, e.g., `blur_output_dtype` for intermediate filtered arrays.
Returns
-------
NDArray
Array of the same shape as `data`, containing the per-cell Shannon entropy.
Notes
-----
- This function relies on :func:`get_filtered_categories` to create per-category arrays.
- The filtered arrays are passed to :func:`compute_entropy` to calculate entropy.
- Normalization scales values to [0,1] (or to the range of `as_dtype` if integer).
- The `as_dtype` and `output_range` parameters only affect the normalized output.
See Also
--------
:func:`get_filtered_categories` : Extract and optionally filter all category arrays.
:func:`compute_entropy` : Compute per-cell entropy over a series of data arrays.
"""
filter_params = filter_params or dict()
blur_output_dtype = params.pop('blur_output_dtype', None)
blurred_categories = get_filtered_categories(data=data,
categories=categories,
img_filter=img_filter,
filter_output_range=filter_output_range,
output_dtype=blur_output_dtype,
filter_params=filter_params)
entropy_params = entropy_params or dict()
return compute_entropy(data_arrays=tuple(blurred_categories.values()),
normed=normed,
max_entropy_categories=max_entropy_categories,
as_dtype=as_dtype,
output_range=output_range,
**entropy_params)
[docs]
def compute_interaction(data_arrays: Sequence[NDArray],
input_dtype: type | str | None = None,
standardize: bool = False,
normed: bool = True,
output_dtype: type | str | None = None,
output_range: tuple | None = None
) -> NDArray:
r"""
Compute per-cell interaction (inspired by the Simpson Index) across a series of data arrays.
The interaction is calculated as the element-wise product of the input arrays.
Optionally, the interaction can be standardized, normalized, and converted to
a specified output data type.
For float inputs:
.. math::
interaction = LC_1 \times LC_2 \times \dots \times LC_n
For integer (e.g., uint8) inputs:
.. math::
interaction = \frac{\left(\frac{LC_1}{\text{max}} \times \frac{LC_2}{\text{max}} \times \dots \right)}{(1/n^n)} \times \text{max}
Parameters
----------
data_arrays : Sequence[NDArray]
Sequence of arrays to compute per-cell interaction over. All arrays must have the same shape.
input_dtype : type or str or None, default=None
Expected data type of the input arrays. Raises an error if actual dtype does not match.
standardize : bool, default=False
If True, interaction is standardized by the sum of the layers:
:math:`interaction = \frac{A \cdot B \cdot ...}{A + B + ...}`.
normed : bool, default=True
If True, interaction values are normalized to the theoretical maximum interaction:
:math:`(1/n)^n` for n arrays.
output_dtype : type or str or None, default=None
Data type for the output array. Values are rescaled appropriately for integer outputs.
output_range : tuple or None, default=None
Target range for output values (currently used only for integer outputs; reserved for future use).
Returns
-------
NDArray
Array of the same shape as the input arrays, containing the per-cell interaction.
Notes
-----
- Standardization (`standardize=True`) scales the interaction by the sum of input layers.
- Conversion to integer types uses scaling and :func:`numpy.ceil` to avoid rounding artifacts.
- `normed=True` ensures the maximum possible interaction corresponds to 1 (float) or the maximum of the integer.
- Input/output range detection uses :func:`~riogrande.helper.dtype_range`.
See Also
--------
:func:`compute_entropy` : Compute per-cell entropy over a series of data arrays.
Examples
--------
Example 1: float inputs, 2 arrays
>>> a = np.array([[0.5, 0.25], [0.0, 0.05]])
>>> b = np.array([[0.5, 0.25], [0.0, 0.3]])
>>> compute_interaction([a, b], standardize=True, normed=True)
array([[1. , 0.5 ],
[0. , 0.17142857]])
>>> compute_interaction([a, b], standardize=True, normed=False)
array([[0.25 , 0.125 ],
[0. , 0.04285714]])
Example 2: integer inputs (uint8), 3 arrays, float output
>>> a = np.array([[85, 100], [50, 60]], dtype=np.uint8)
>>> b = np.array([[85, 50], [100, 80]], dtype=np.uint8)
>>> c = np.array([[85, 105], [100, 10]], dtype=np.uint8)
>>> compute_interaction([a, b, c], standardize=True, normed=True, output_dtype=np.float64)
array([[1. , 0.85487482],
[0.83044983, 0.13287197]])
"""
array_dtype = data_arrays[0].dtype
# define rescaling based on input type
if input_dtype is None:
input_dtype = array_dtype
if isinstance(input_dtype, str):
input_dtype = np.dtype(input_dtype)
if array_dtype != input_dtype:
raise ValueError(f"Array data type {array_dtype} does not match "
f"provided input data type {input_dtype}")
# determine scaling for input
_max_scale, _ = dtype_range(input_dtype)
if np.issubdtype(input_dtype, np.floating):
_max_scale = 1.0
# calculate the interaction (A * B)
interaction_array = np.ones_like(data_arrays[0], dtype=float)
for arr in data_arrays:
interaction_array *= (arr / _max_scale)
# standardize by the sum (A + B) -> result is A*B/(A+B)
if standardize:
standardize_array = np.zeros_like(data_arrays[0], dtype=float)
for arr in data_arrays:
standardize_array += (arr / _max_scale)
interaction_array = np.divide(interaction_array, standardize_array,
out=np.zeros_like(
interaction_array, dtype=float),
where=standardize_array != 0)
if normed:
n = len(data_arrays)
max_interaction = 1 / (n ** n)
interaction_array /= max_interaction
if output_dtype:
if isinstance(output_dtype, str):
output_dtype = np.dtype(output_dtype)
_max, _ = dtype_range(output_dtype)
if np.issubdtype(output_dtype, np.floating):
_max = 1
interaction_array = (interaction_array * _max).astype(output_dtype)
else:
# when rescaling to uint8 it is important to not have values > 1 before rescaling (else 256 would be 0)
interaction_array[interaction_array > 1] = 1
# np.ceil relevant to avoid artefacts from rounding (when using unit8)
interaction_array = np.ceil(
(interaction_array * _max)).astype(output_dtype)
return interaction_array
def _view_data(source: Source | str,
bands: list[Band | int] | None,
view: tuple[int, int, int, int],
in_range: None | NDArray | Collection,
as_dtype: type | str | None,
output_range: None | NDArray | Collection,
) -> dict:
"""
Load a view from a TIFF file and optionally convert and rescale the data.
You may use the `**tags` to specify which band to read, by default only the
first band is read. If tags for multiple bands are provided then all
matching bands are returned.
Parameters
----------
source : Source or str
The path to the TIFF file or a `Source` object to load.
bands : list[Band or int] or None
Collection of bands to read. Can be `Band` objects or band indices. If None,
all bands are loaded.
view : tuple[int, int, int, int]
A tuple defining the subset of the data to read (e.g., (x_start, y_start, x_end, y_end)).
in_range : NDArray or Collection or None
Optional input range to use for rescaling.
as_dtype : type or str or None
Optional data type to convert the returned data to. The data will be rescaled
to match this type if provided.
output_range : NDArray or Collection or None
Optional tuple to overwrite the [min,max] range of the output.
See `io.load_block` for further details.
Returns
-------
dict
A dictionary mapping each :class:`~riogrande.io.models.Band` object to a dictionary
containing:
- ``'data'``: the loaded and optionally rescaled data array
- ``'view'``: the view tuple used for loading the data
See Also
--------
:func:`_view_filtered` : Load, filter, and rescale a view from a source.
:func:`_filter_data` : Apply a filter and rescale the resulting data.
"""
# read out block from original file
if not isinstance(source, Source):
source = Source(path=source)
if bands is None:
print('No specific bands selected, using all')
bands = source.get_bands()
elif any(isinstance(band, int) for band in bands):
bands = [band if isinstance(band, Band) else source.get_band(bidx=band)
for band in bands]
data_views = dict()
for band in bands:
result = band.load_block(view=view, scaling_params=None)
data = result.pop('data')
if as_dtype is not None:
data = convert_to_dtype(data=data,
as_dtype=as_dtype,
in_range=in_range,
out_range=output_range)
data_views[band] = dict(data=data, view=view)
return data_views
def _apply_filter(data: NDArray, img_filter: Callable, **params) -> NDArray:
"""
Apply a filter function to an array.
Parameters
----------
data : NDArray
Input array to be filtered.
img_filter : Callable
A callable that accepts `data` as the first argument, along with any
keyword arguments, and returns a filtered array.
**params : dict
Additional keyword arguments passed directly to `img_filter`.
Returns
-------
NDArray
The filtered result from calling `img_filter(data, **params)`.
See Also
--------
:func:`_filter_data` : Full pipeline: NaN handling, filtering, and dtype conversion.
"""
return img_filter(data, **params)
def _filter_data(data: NDArray,
replace_nan_with: int | float | None = None,
img_filter=None,
filter_output_range: tuple | None = None,
filter_params: dict | None = None,
as_dtype: type | str | None = None,
output_range: tuple | None = None,
) -> NDArray:
"""
Apply a filter to a 2D NumPy array with optional NaN handling, type conversion,
and output rescaling.
Parameters
----------
data : NDArray
A 2D numpy array to be filtered.
replace_nan_with : int or float or None
Value to replace NaNs in `data`. If None, NaNs remain unchanged.
img_filter : Callable or None
Filter function to apply to the array (e.g., `skimage.filters.gaussian`).
filter_output_range : tuple or None
Range of values produced by the filter. Used when converting or rescaling the output.
filter_params : dict or None
Additional parameters to pass to the filter callable.
as_dtype : type or str or None
Data type for the returned array. For example, `np.float64` or `'float32'`.
output_range : tuple or None
Target range to rescale the filtered data into. Useful to map the data to [0, 1]
or any custom range.
Returns
-------
NDArray
The filtered (and optionally rescaled) array with NaNs restored if applicable.
Notes
-----
- NaNs in the original array are restored after filtering if the output dtype supports
floating-point NaNs.
- `filter_output_range` should be set when `as_dtype` or `output_range` is used to
avoid unexpected scaling.
- Uses :func:`~riogrande.helper.convert_to_dtype` for dtype conversion and rescaling.
See Also
--------
:func:`_apply_filter` : Apply a filter function to an array.
:func:`_view_filtered` : Load a view and apply filtering end-to-end.
Examples
--------
>>> from skimage.filters import gaussian
>>> data = np.array([[1, 2, 1], [np.nan, 4, 5], [2, 4, 6]] )
>>> data
array([[ 1., 2., 1.],
[nan, 4., 5.],
[ 2., 4., 6.]])
>>> _filter_data(data, replace_nan_with=0, img_filter=gaussian, filter_params={'sigma': 1})
array([[1.33293744, 1.9624767 , 2.25847273],
[ nan, 2.85862763, 3.7419668 ],
[2.27905522, 3.62953263, 4.84767823]])
"""
# check if nan exists
if np.isnan(np.sum(data)) and replace_nan_with is None:
if img_filter == gaussian: # only warn for gaussian
warnings.warn(
f"Raster array has NaN - this will crop areas where the given "
"function encounters NaNs. If needed: Set a replacement value "
f"for NaNs ({replace_nan_with=})")
# create a mask for NaN values (for restoring later)
nan_mask = np.isnan(data)
# replace nan with a provided value
if replace_nan_with is not None:
data = np.nan_to_num(data, nan=replace_nan_with)
# apply the filter if one is chosen
if img_filter is not None:
if filter_output_range is None and as_dtype is not None:
warnings.warn(
f"We are applying the filter {img_filter} and convert the "
f"resulting output to {as_dtype} without knowing the range of "
"the data produced by the filter. Rescaling to another data "
"type is likely to produce unexpected results if the input "
"range is unknown (e.g. if the filter outputs floats then the "
"entire range of float is used as input range, which is not "
"what you want if the filter produces values only in the range "
"[0, 1], for example. Please set the filter output range with "
"`filter_output_range` to avoid unpleasant surprises."
)
filter_params = filter_params or dict()
if img_filter in (gaussian, bpgaussian):
if np.issubdtype(data.dtype, np.integer) and \
filter_params.get('preserve_range', None) is None:
# the gaussian filter will rescale the input
warnings.warn(
"A gaussian filter will be applied to data of type int. "
"The gaussian filter will first rescale the data to "
"floats ([0,1] for uints and [-1,1] for ints), thus the "
"output will be of a different data type and scale. "
"To avoid this set `filter_params['preserve_range']=True`"
)
_data = _apply_filter(data, img_filter, **filter_params)
else:
_data = data
# now we convert and optionally rescale
_data = convert_to_dtype(data=_data,
as_dtype=as_dtype,
in_range=filter_output_range,
out_range=output_range)
# restore NaNs to original positions (if possible)
if np.issubdtype(_data.dtype, np.floating):
_data[nan_mask] = np.nan
return _data
def _view_filtered(source: Source | str,
view: tuple[int, int, int, int],
inner_view: tuple[int, int, int, int],
data_in_range: None | NDArray | Collection = None,
data_as_dtype: type | str | None = None,
data_output_range: None | NDArray | Collection = None,
replace_nan_with: int | float | None = None,
img_filter: Callable | None = None,
filter_params: dict | None = None,
filter_output_range: tuple | None = None,
as_dtype: type | str | None = None,
output_range: tuple | None = None,
bands: list[Band | int] | None = None,
selector_band: Band | None = None,
) -> dict:
"""
Extract a view from a source file and apply filtering and rescaling.
Parameters
----------
source : Source or str
Path to the source file or a `Source` object.
view : tuple[int, int, int, int]
Region to load from the source (x_start, y_start, x_end, y_end).
inner_view : tuple[int, int, int, int]
Subregion to extract from the filtered result.
data_in_range : NDArray or Collection or None
Input range used for loading the raw data.
data_as_dtype : type or str or None
Data type to convert the loaded data to before filtering.
data_output_range : NDArray or Collection or None
Range to rescale the data after loading.
replace_nan_with : int or float or None
Value to replace NaNs during filtering.
img_filter : Callable or None
Filter function to apply to the data.
filter_params : dict or None
Parameters to pass to the filter function.
filter_output_range : tuple or None
Expected output range of the filter for proper scaling.
as_dtype : type or str or None
Data type for the final filtered output.
output_range : tuple or None
Range to rescale the final filtered output.
bands : list[Band or int] or None
Bands to load from the source. Defaults to all available bands.
selector_band : Band or None
Band used to select regions for filtering and aggregation.
Returns
-------
dict
A dictionary with keys:
- ``'data'``: maps band indices to filtered arrays corresponding to `inner_view`
- ``'view'``: the `inner_view` tuple representing the returned region
See Also
--------
:func:`_view_data` : Load a view from a source without filtering.
:func:`_filter_data` : Apply a filter and rescale the resulting data.
"""
data_views = _view_data(source=source,
bands=bands,
view=view,
in_range=data_in_range,
as_dtype=data_as_dtype,
output_range=data_output_range)
if selector_band is not None:
selector_data = selector_band.load_block(view=view)['data']
selectors = np.unique(selector_data, ).tolist()
if np.nan in selectors:
selectors.remove(np.nan)
else:
selectors = [0, ]
selector_data = np.zeros(shape=(view[3], view[2]), dtype=np.uint8)
# get output data type for final output - to use for filtered aggregation later
if isinstance(as_dtype, str):
_as_dtype = np.dtype(as_dtype)
else:
_as_dtype = as_dtype
filtered_datas = {}
for band, data_view in data_views.items():
# We need to use the same no data for the filtering as defined in the views
_nodata = 0
dv_dtype = data_view['data'].dtype
if np.issubdtype(dv_dtype, np.floating):
_nodata = np.nan
_filtered_data = np.zeros(shape=(view[3], view[2]), dtype=_as_dtype)
for select in selectors:
_selector = np.where(selector_data == select, True, False)
select_data_view = np.where(_selector, data_view['data'], _nodata)
_part_filtered_data = _filter_data(
data=select_data_view,
replace_nan_with=replace_nan_with,
img_filter=img_filter,
filter_params=filter_params,
filter_output_range=filter_output_range,
as_dtype=as_dtype,
output_range=output_range)
_filtered_data += np.where(_selector, _part_filtered_data, 0)
# only keep the inner view
filtered_datas[band.get_bidx()] = np.copy(
get_view(_filtered_data,
relative_view(view, inner_view)
)
)
return dict(data=filtered_datas, view=inner_view)
[docs]
def view_blurred(source: str,
view: tuple[int, int, int, int],
inner_view: tuple[int, int, int, int],
categories: Collection | None,
img_filter: Callable,
filter_params: dict = dict(),
filter_output_range: tuple | None = None,
output_dtype: type | str | None = "uint8",
output_range: tuple | None = None,
**tags):
"""
Compute blurred binary arrays for each category in a categorical TIFF file.
The provided TIFF file must contain at least one band with categorical data
(e.g., of type `uint`). For each specified category, an indicator array is
created (dichotomous array marking presence/absence of that category), which
is then filtered using the provided `img_filter` function.
.. note::
This method will be moved to the `parallel` sub-module in a future release.
Parameters
----------
source : str
Path to the TIFF file to load.
view : tuple of int
A 4-tuple defining the view of the data array to update: `(start_row, end_row, start_col, end_col)`.
inner_view : tuple of int
A 4-tuple defining the inner part of the view, excluding border effects.
categories : Collection, optional
A collection of category values to extract. If `None`, all categories are processed.
img_filter : Callable
A function that will be applied to each category indicator array.
filter_params : dict, optional
Keyword arguments to pass to `img_filter`. Default is an empty dictionary.
filter_output_range : tuple, optional
Output range for the filtered arrays. If `None`, no explicit rescaling is applied.
output_dtype : type or str, optional
Data type for the returned arrays. Default is `"uint8"`.
.. note::
If provided, the output of the filter function will be rescaled to the
range of this data type. See `get_category_data` for details.
output_range : tuple, optional
Explicit output range for the filtered arrays. If not provided and the
filter produces float-type data, the range `[0, 1]` is assumed, with
values clipped to this range.
**tags : keyword arguments
Arbitrary keyword arguments to specify which band to read from the TIFF
file. See :func:`~riogrande.io.load_block` for further details.
Returns
-------
dict
Dictionary with keys:
- ``'data'``: a dictionary mapping each category to its blurred array
- ``'view'``: the `inner_view` defining the effective area of the returned arrays
See Also
--------
:func:`view_entropy` : Compute per-cell entropy for a set of category arrays.
:func:`view_interaction` : Compute per-cell interaction for a set of category arrays.
:func:`get_filtered_categories` : Extract all categories with optional filtering.
"""
# read out block from original file
result = load_block(source, view=view, scaling_params=None, **tags)
data = result.pop('data')
# print(f"{data.shape=}")
# transform = result.pop('transform')
# orig_profile = result.pop('orig_profile')
# perform blur
blurred_categories = get_filtered_categories(
data=data,
categories=categories,
img_filter=img_filter,
filter_params=filter_params,
filter_output_range=filter_output_range,
output_dtype=output_dtype,
output_range=output_range,
)
# get the relative view
for category, data in blurred_categories.items():
blurred_categories[category] = np.copy(
get_view(data,
relative_view(view, inner_view))
)
return dict(data=blurred_categories, view=inner_view)
[docs]
def view_entropy(category_arrays: dict[int, NDArray],
view: tuple[int, int, int, int],
normed: bool = True,
max_entropy_categories: int | None = None,
output_dtype: type | str | None = None,
output_range: tuple | None = None) -> dict:
"""
Compute the per-cell entropy for a set of category arrays within a specified view.
Parameters
----------
category_arrays : dict[int, NDArray]
Dictionary mapping category indices to their corresponding arrays.
view : tuple[int, int, int, int]
A tuple defining the subregion of the arrays to process (e.g., (x_start, x_end, y_start, y_end)).
normed : bool
If True, normalize the entropy values to the range [0, 1] using the maximum
possible entropy determined by `max_entropy_categories`.
max_entropy_categories : int or None
The maximum number of categories used for normalization. Ignored if `normed=False`.
output_dtype : type or str or None
Data type for the returned entropy array. If None, the dtype is inferred.
output_range : tuple or None
Range to scale the output values to, e.g., (0, 1).
Returns
-------
dict
Dictionary with keys:
- ``'data'``: NDArray of computed entropy values for the specified view.
- ``'view'``: tuple defining the original view of the data arrays.
See Also
--------
:func:`view_blurred` : Compute blurred binary arrays per category.
:func:`view_interaction` : Compute per-cell interaction for a set of category arrays.
:func:`compute_entropy` : Underlying entropy computation function.
"""
entropy_array = compute_entropy(
data_arrays=tuple(category_arrays.values()),
normed=normed,
max_entropy_categories=max_entropy_categories,
as_dtype=output_dtype,
output_range=output_range,
)
return dict(data=entropy_array, view=view)
[docs]
def view_interaction(category_arrays: dict[int, NDArray],
view: tuple[int, int, int, int],
input_dtype: type | str | None = np.uint8,
standardize: bool = False,
normed: bool = True,
output_dtype: type | str | None = None,
output_range: tuple | None = None) -> dict:
"""
Compute the per-cell interaction metric for a set of category arrays within a specified view.
The function returns a dictionary containing the computed interaction array and the
original view. Interaction values can be standardized, normalized, and returned
in a specific data type or range.
Parameters
----------
category_arrays : dict[int, NDArray]
Dictionary mapping category indices to their corresponding arrays.
view : tuple[int, int, int, int]
A tuple defining the subregion of the arrays to process (e.g., (x_start, x_end, y_start, y_end)).
input_dtype : type or str or None
Data type for input conversion before computing interactions.
standardize : bool
If True, standardize the input arrays before computing interaction.
normed : bool
If True, normalize the computed interaction values.
output_dtype : type or str or None
Data type for the returned interaction array. If None, the dtype is inferred.
output_range : tuple or None
Range to scale the output values to, e.g., (0, 1).
Returns
-------
dict
Dictionary with keys:
- ``'data'``: NDArray of computed interaction values for the specified view.
- ``'view'``: tuple defining the original view of the data arrays.
See Also
--------
:func:`view_blurred` : Compute blurred binary arrays per category.
:func:`view_entropy` : Compute per-cell entropy for a set of category arrays.
:func:`compute_interaction` : Underlying interaction computation function.
"""
interaction_array = compute_interaction(
data_arrays=tuple(category_arrays.values()),
input_dtype=input_dtype,
standardize=standardize,
normed=normed,
output_dtype=output_dtype,
output_range=output_range,
)
return dict(data=interaction_array, view=view)
[docs]
def get_entropy_view(source: str,
view: tuple[int, int, int, int],
inner_view: tuple[int, int, int, int],
categories: Collection,
img_filter,
filter_params: dict = dict(),
max_entropy_categories: int | None = None,
blur_as_int: bool | None = None,
filter_output_range: tuple | None = None,
blur_output_dtype: type | str | None = None,
output_dtype: type | str | None = None,
output_range: tuple | None = None,
normed: bool = True,
**tags):
"""Returns the entropy for some categories over a view from a tif file
.. warning::
This function is deprecated and should not be used
Parameters
----------
source : str
Path to the TIFF file to load.
view : tuple of int
A 4-tuple defining the view of the data array to update: `(start_row, end_row, start_col, end_col)`.
inner_view : tuple of int
A 4-tuple defining the inner part of the view, excluding border effects.
categories : Collection, optional
A collection of category values to extract. If `None`, all categories are processed.
img_filter : Callable
A function that will be applied to each category indicator array.
filter_params : dict, optional
Keyword arguments to pass to `img_filter`. Default is an empty dictionary.
filter_output_range : tuple, optional
Output range for the filtered arrays. If `None`, no explicit rescaling is applied.
view : tuple[int, int, int, int]
A tuple defining the subregion of the arrays to process (e.g., (x_start, x_end, y_start, y_end)).
max_entropy_categories : int or None
If normed is true, this determines the maximum n for Entropy to be used to caluclate the maximum to norm by.
This argument is ignored if `normed=False`.
output_dtype : type or str or None
Data type for the returned entropy array. If None, the dtype is inferred.
output_range : tuple or None
The data-range to use for the returned array.
.. note::
This argument is only taken into account if `normed=True`.
normed : bool
If True, normalize the entropy values to the range [0, 1] using the maximum
possible entropy determined by `max_entropy_categories`.
**tags : dict
Arbitrary number of keyword arguments to describe the band to select.
See :func:`~riogrande.io.load_block` for further details.
See Also
--------
:func:`view_blurred` : Compute blurred binary arrays per category.
:func:`view_entropy` : Compute per-cell entropy for a set of category arrays.
"""
warnings.warn("This function is deprecated and will be removed",
category=DeprecationWarning)
if blur_as_int is None:
assert blur_output_dtype is not None
else:
if blur_as_int:
blur_output_dtype = "uint8"
else:
blur_output_dtype = "float64"
blurred_data = view_blurred(
source=source,
view=view,
inner_view=inner_view,
categories=categories,
img_filter=img_filter,
filter_params=filter_params,
filter_output_range=filter_output_range,
output_dtype=blur_output_dtype,
**tags
)
assert blurred_data['view'] == inner_view
entropy_view = view_entropy(category_arrays=blurred_data['data'],
view=blurred_data['view'],
output_dtype=output_dtype,
output_range=output_range,
normed=normed,
max_entropy_categories=max_entropy_categories,
)
entropy_view['view'] = blurred_data['view']
return entropy_view