Source code for coonfit.inference

"""
This module contains functions to facilitate carrying out various inference
methods.

In particular, it implements a multiple linear regression approach that allows
to use categorical and any other type of maps as predictors for some response
variable that is also provided as a map.

An exemplary use case is the usage of land-cover types and various derivatives
thereof as predictors for NDVI or land surface temperature.

"""
from __future__ import annotations

import numpy as np
from numpy.typing import NDArray
from rasterio.windows import Window

from operator import mul
from collections.abc import Collection

from sklearn.linear_model import LinearRegression

from riogrande.helper import (
    check_compatibility,
    view_to_window,
    convert_to_dtype
)
from riogrande.io import Source, Band

from .helper import (
    usable_pixels_info,
    usable_pixels_count,
)


def _to_numpy_selector(rasterio_mask: NDArray) -> NDArray:
    """
    Convert rasterio mask to a boolean selector array.

    Converts a rasterio mask (e.g., from `read_masks(band)`) into a boolean
    numpy array suitable for indexing. The returned array is an inverted mask
    where `True` indicates usable cells and `False` indicates masked cells.

    Parameters
    ----------
    rasterio_mask : NDArray
        Mask array as returned by :meth:`rasterio.io.DatasetReader.read_masks`.
        Non-zero values indicate valid data; zero values indicate masked data.

    Returns
    -------
    NDArray of bool
        Boolean array with the same shape as `rasterio_mask`. `True` values
        indicate cells that can be used; `False` values indicate cells that
        should be excluded.

    See Also
    --------
    :func:`_enrich_selector` : Refine a selector by combining predictor masks.
    :func:`prepare_selector` : Build a combined selector from response and predictor masks.
    """
    return np.where(rasterio_mask != 0, True, False)


def _enrich_selector(selector: NDArray, *predictors: Band, verbose: bool = False) -> NDArray:
    """Refine selector array by combining masks from predictor bands.

    Updates the input selector by applying logical AND operations with masks
    extracted from each predictor band. This ensures only cells that are valid
    across all predictors are selected.

    Parameters
    ----------
    selector : NDArray of bool
        Boolean array indicating usable cells in a 2D raster. `True` values
        indicate cells that can be used; `False` values indicate cells that
        should be excluded.
    *predictors : Band
        Variable number of :class:`~riogrande.io.models.Band` objects, each specifying one or more
        predictor variables. Bands sharing the same mask reader are grouped
        together for efficiency. See :func:`prepare_predictors` for more details.
    verbose : bool, optional
        If True, print runtime information including predictor names, mask
        readers, and usable pixel counts. Default is False.

    Returns
    -------
    NDArray of bool
        Boolean array with the same shape as `selector`. Contains the logical
        AND of the input selector and all predictor masks, indicating cells
        that are valid across all inputs.

    See Also
    --------
    :func:`_to_numpy_selector` : Convert a rasterio mask to a boolean selector.
    :func:`prepare_selector` : Build a combined selector from response and predictor masks.
    """
    aggr_selector = np.copy(selector)
    pred_mask_readers = dict()
    for predictor in predictors:
        pred_mask_reader = predictor.get_mask_reader()
        if pred_mask_reader in pred_mask_readers:
            pred_mask_readers[pred_mask_reader].append(predictor)
        else:
            pred_mask_readers[pred_mask_reader] = [predictor, ]
    # print(f"{pred_mask_readers=}")

    for mask_reader in pred_mask_readers:
        with mask_reader() as read_mask:
            _pred_mask = read_mask()
            all_pixels = mul(*_pred_mask.shape)
        pred_selector = _to_numpy_selector(_pred_mask)
        if verbose:
            data_pixels = usable_pixels_count(pred_selector)
            print("##########")
            _pred_string = '- ' + '\n\t- '.join(map(str, pred_mask_readers[mask_reader]))
            print(f"Predictor(s):\n\t{_pred_string}")
            print(f"\tUse mask: {mask_reader}")
            usable_pixels_info(all_pixels, data_pixels)
        np.logical_and(aggr_selector, pred_selector, out=aggr_selector)
    return aggr_selector


[docs] def prepare_selector(response: str | Band, *predictors: Band, extra_masking_band: Band | None = None, verbose=False) -> NDArray: """ Create a boolean selector based on the masks of response and predictors. The selector is a boolean array indicating which pixels can be used (True) and which cannot (False) based on the combined masks of all input bands. Parameters ---------- response : str or Band A Band object or path string describing the response data. If a string is provided, it will be converted to a Band object with bidx=1. *predictors : Band Variable number of Band objects, each specifying one or more predictor variables. Their masks will be combined with the response mask. extra_masking_band : Band, optional Additional Band object treated as a rasterio mask, where values equal to 0 will be masked out. Default is None. verbose : bool, optional If True, prints runtime information about usable pixels at each masking stage. Default is False. Returns ------- NDArray of bool Boolean array of the same shape as the response band, where True indicates usable pixels and False indicates masked pixels. Notes ----- The selector combines masks using logical AND operations, meaning a pixel is only usable (True) if it is valid across all input bands. The mask reader can be configured using :meth:`~riogrande.io.models.Band.set_mask_reader` with ``use='band'`` or ``use='source'``. See Also -------- :func:`_enrich_selector` : Refine a selector using predictor masks. :func:`init_X` : Initialize the predictor matrix using the selector. :func:`prepare_predictors` : High-level function that calls this internally. Examples -------- >>> selector = prepare_selector(response_band, predictor1, predictor2, verbose=True) >>> valid_data = response_data[selector] """ if not isinstance(response, Band): response = Band(source=Source(path=response), bidx=1) # Note: we can set which reader to use with responze.set_mask_reader(use='band'/'source') response_mask_reader = response.get_mask_reader() src_profile = response.source.import_profile() with response_mask_reader() as read_mask: mask = read_mask() src_width = src_profile["width"] src_height = src_profile["height"] all_pixels = src_width * src_height selector = _to_numpy_selector(mask) if verbose: print("\nResponse data:") data_pixels = usable_pixels_count(selector) usable_pixels_info(all_pixels, data_pixels) # now handle the extra mask band if extra_masking_band is not None: extra_selector = _to_numpy_selector(extra_masking_band.get_data()) # combine with logical and, as only both True should lea to True selector = np.logical_and(selector, extra_selector) if verbose: print("\nResponse data after extra masking:") data_pixels = usable_pixels_count(selector) usable_pixels_info(all_pixels, data_pixels) # first we get all the masks to compute an overall mask aggr_selector = _enrich_selector(selector, *predictors, verbose=verbose) data_pixels = usable_pixels_count(aggr_selector) if verbose: print("\nFinal selector:\n") usable_pixels_info(all_pixels, data_pixels) return aggr_selector
[docs] def init_X(predictors: Collection[Band], selector: NDArray, window: Window | None, include_intercept: bool, as_dtype: type | str) -> NDArray: """ Initialize the predictor matrix :math:`X` with appropriate dimensions. Creates an empty predictor matrix with rows corresponding to usable pixels (as determined by the selector) and columns for each predictor band, plus an optional intercept column. Parameters ---------- predictors : Collection of Band Collection of Band objects, each specifying one or more predictor variables. The number of predictors determines the number of columns in the output matrix (excluding the intercept). selector : NDArray of bool Boolean array to select usable cells. Only pixels where selector is True will be included in the matrix rows. window : Window or None Limits the data array to a specific spatial window. If provided, the window is converted to slices using :meth:`rasterio.windows.Window.toslices`. If None, the entire selector array is used. include_intercept : bool If True, adds an extra column of ones at the end of the matrix for fitting intercept terms in regression models. as_dtype : type or str Data type for the output array. Can be a numpy dtype or string specification (e.g., 'float64', np.float32). Returns ------- NDArray Zero-initialized array of shape (n_rows, n_cols) where: - n_rows is the count of usable pixels in the (windowed) selector - n_cols is ``len(predictors) + 1`` if include_intercept else ``len(predictors)`` Notes ----- The ``window`` parameter is intended for parallelized processing where different processes handle different spatial subsets of the data. If the window slicing results in an IndexError (e.g., window is completely outside the selector bounds), the function returns an array with 0 rows. See Also -------- :func:`populate_X` : Fill the initialized predictor matrix with data. :func:`partial_X` : Initialize and populate a predictor matrix in one step. """ if window is not None: partial = window.toslices() else: partial = slice(None) try: nbr_rows = usable_pixels_count(selector[partial]) except IndexError: nbr_rows = 0 nbr_cols = len(predictors) if include_intercept: nbr_cols += 1 # create emtpy predictor array return np.zeros((nbr_rows, nbr_cols), dtype=np.dtype(as_dtype))
[docs] def populate_X(X: NDArray, predictors: Collection[Band], as_dtype: type | str, window: Window | None, selector: NDArray, include_intercept: bool): """ Populate predictor matrix :math:`X` with data from predictor bands. Reads data from each predictor band, applies the selector mask within the specified window, and fills the columns of matrix :math:`X`. Optionally adds an intercept column of ones. Parameters ---------- X : NDArray Pre-initialized array to be populated with predictor data. Modified in-place. Should have shape (n_usable_pixels, n_predictors) or (n_usable_pixels, n_predictors + 1) if include_intercept is True. predictors : Collection of Band Collection of Band objects, each specifying one predictor variable. Data from each band will be read and placed in the corresponding column of :math:`X`. as_dtype : type or str Target data type for predictor values. Converted using :func:`~riogrande.helper.convert_to_dtype` without rescaling (``in_range`` and ``out_range`` are ``None``). window : Window or None Limits data reading to a specific spatial window. If provided, the window is converted to slices using :meth:`rasterio.windows.Window.toslices`. If None, the entire data array is used. selector : NDArray of bool Boolean array to select usable pixels. Applied after windowing to extract only valid data points for the predictor matrix. include_intercept : bool If True, the last column of X is filled with ones to represent the intercept term in regression models. Returns ------- None X is modified in-place. See Also -------- :func:`init_X` : Allocate the empty predictor matrix. :func:`partial_X` : Initialize and populate in one step. """ # Create a window if window is not None: _selector = selector[window.toslices()] else: _selector = selector for i, predictor in enumerate(predictors): with predictor.data_reader() as read: pred_data = read(window=window) # perform type conversion without any rescaling pred_data_converted = convert_to_dtype(pred_data, as_dtype=as_dtype, in_range=None, out_range=None) X[:, i] = pred_data_converted[_selector].reshape(1, -1) # apply the mask and populate X # attach intercept column (of 1s) if chosen if include_intercept: X[:, -1] = 1.0
[docs] def prepare_predictors(response: str | Band, *predictors: Band | str, view: tuple[int, int, int, int] | None = None, include_intercept=True, verbose: bool = False): """ Generate the predictor matrix and response vector for multiple linear regression. This function constructs the predictor matrix :math:`X` and the response vector :math:`y` used in a multiple linear regression model of the form .. math:: y = X\\beta + \\epsilon where :math:`y` represents the response data and :math:`X` the predictor matrix. The response data are used as a reference to determine valid pixels. A mask is extracted from the response (e.g., nodata values or an 8-bit mask) and applied to all predictors. Predictor-specific missing values are also added to the mask, ensuring that only pixels with complete information across all variables are included in the regression. This masking procedure reduces the number of pixels used in the analysis, yielding a smaller but denser predictor matrix. Notes ----- An :exc:`~coonfit.exceptions.InferenceError` is raised if an invalid predictor is detected. A predictor is considered invalid in the following cases: * After masking, the predictor contains only zeros. * The predictor represents categorical data where all selected pixels \ belong to a category and ``include_intercept`` is ``True``. No general test for linear dependence between predictors is performed. Parameters ---------- response : str or Band Response variable. Either a ``Band`` object or a string specifying the path to a raster file (``.tif``) containing the response data. If a string is provided, the raster must contain exactly one band. *predictors : Band or str One or more predictors specified as ``Band`` objects or file paths. If a string is provided, it is interpreted as the path to a raster file and **all bands** in that file are added as individual predictors. Predictor data are cast to the same data type as the response. No rescaling is performed. view : tuple of int, optional Spatial subset of the data specified as ``(x, y, width, height)``. If not provided, the entire response raster is used. include_intercept : bool, default=True If ``True``, an additional column of ones is appended to the predictor matrix to model an intercept term. verbose : bool, default=False If ``True``, print processing information. Returns ------- X : NDArray of shape (n_samples, n_features) Predictor matrix. The number of features corresponds to the total number of predictors, plus one if ``include_intercept`` is ``True``. y : NDArray of shape (n_samples,) Response vector containing the response values corresponding to the selected pixels. See Also -------- :func:`prepare_selector` : Build the boolean selector used internally. :func:`init_X` : Allocate the predictor matrix. :func:`transposed_product` : Compute X.T @ X for large spatial datasets. """ if not isinstance(response, Band): response = Band(source=Source(path=response), bidx=1) # make sure the Source profile is updated response_profile = response.source.import_profile() response_dtype = response_profile['dtype'] # make sure the predictors are bands _predictors = [] for pred in predictors: if not isinstance(pred, Band): _source = Source(path=pred) _bands = _source.get_bands() _predictors.extend(_bands) else: _predictors.append(pred) # get all paths and check the compatibility _sources = [response.source.path, ] _sources.extend( [pred.source.path for pred in _predictors] ) # make sure used tif files are compatible (i.e. check crs and units) check_compatibility(*set(_sources)) # extract the mask from response and enrich it with masks from predictors aggr_selector = prepare_selector(response, *_predictors, verbose=verbose) riow = view_to_window(view) # create empty predictor array X = init_X(_predictors, selector=aggr_selector, window=riow, include_intercept=include_intercept, as_dtype=response_dtype) populate_X(X=X, predictors=_predictors, window=riow, selector=aggr_selector, include_intercept=include_intercept, as_dtype=response_dtype) # return predictor matrix and the response vector y = partial_response(response=response, window=riow, selector=aggr_selector) return X, y
[docs] def transposed_product(predictors: Collection[Band], view: tuple[int, int, int, int] | None, selector: NDArray, include_intercept: bool = False, as_dtype: str | type = "float64"): """ Compute the transposed product :math:`X^T X` for a set of predictors. This function extracts predictor values within a specified spatial view, applies a boolean selector to filter valid pixels, constructs the predictor matrix :math:`X`, and returns its transposed product. The result is commonly used in linear regression for computing normal equations. Parameters ---------- predictors : Collection[Band] Collection of ``Band`` objects defining the predictor variables. view : tuple of int or None Spatial subset specified as ``(x, y, width, height)``. If ``None``, the full spatial extent is used. selector : NDArray of bool Boolean array indicating which pixels are valid and should be included in the computation. include_intercept : bool, default=False If ``True``, include an additional column of ones in the predictor matrix to model an intercept term. as_dtype : str or type, default="float64" Data type of the resulting array. Returns ------- transprodX : NDArray of shape (n_features, n_features) The transposed product of the predictor matrix, ``X.T @ X``. The number of features corresponds to the number of predictors, plus one if ``include_intercept`` is ``True``. See Also -------- :func:`get_optimal_weights` : Compute regression weights from X and y. :func:`get_optimal_weights_source` : Compute weights using precomputed inverse. :func:`partial_X` : Generate the predictor matrix ``X``. """ riow = view_to_window(view) X = partial_X(predictors=predictors, window=riow, selector=selector, include_intercept=include_intercept, as_dtype=as_dtype) # create transprodX matrix: # ### # this is equivalent transprodX = X.T @ X # to this: # nbr_pred = X.shape[1] # transprodX = np.zeros((nbr_pred, nbr_pred), dtype=as_dtype) # for i in range(nbr_pred): # col1 = X[:,i] # for j in range(i, nbr_pred): # col2 = X[:,j] # transprodX[i,j] = np.sum(np.multiply(col1, col2)) # ### return transprodX
[docs] def get_optimal_weights(X, y): """ Compute the optimal regression weights for a multiple linear regression. The multiple linear regression model is defined as .. math:: y = X\\beta + \\epsilon where :math:`X` is the predictor matrix, :math:`y` the response vector, :math:`\\beta` the vector of regression weights, and :math:`\\epsilon` a random error term. The optimal least-squares solution for :math:`\\beta` is given by .. math:: \\beta = (X^T X)^{-1} X^T y which is computed directly using NumPy linear algebra routines. Notes ----- * The matrix :math:`X^T X` may be singular or ill-conditioned, in which case the inverse does not exist or the solution may be numerically unstable. * In such cases, alternative approaches such as singular value decomposition (SVD) or :func:`numpy.linalg.lstsq` are recommended. Parameters ---------- X : NDArray of shape (n_samples, n_features) Predictor matrix where each row corresponds to an observation and each column to a predictor variable. y : NDArray of shape (n_samples,) Response vector. Returns ------- beta : NDArray of shape (n_features,) Optimal regression weights minimizing the least-squares error. See Also -------- :func:`get_approx_weights` : Estimate weights via scikit-learn's LinearRegression. :func:`transposed_product` : Compute X.T @ X from spatial band data. :func:`prepare_predictors` : Build X and y from raster bands. """ return (np.linalg.inv(X.T @ X) @ X.T) @ y
[docs] def partial_response(response: str | Band, window: Window | None, selector: NDArray): """ Extract and return the response values within a window after applying a selector. This function reads the response raster data, optionally restricts it to a spatial window, and applies a boolean selector to return only the valid response values. The resulting array is suitable for use as the response vector in regression analyses. Parameters ---------- response : str or Band Response variable specified either as a ``Band`` object or as the path to a raster file (``.tif``). If a string is provided, the raster must contain a single band. window : rasterio.windows.Window or None Spatial window defining the subset of the response raster to read. If ``None``, the full raster extent is used. selector : NDArray of bool Boolean array used to select valid pixels. If ``window`` is provided, the selector is sliced accordingly before being applied. Returns ------- y : NDArray of shape (n_samples,) One-dimensional array containing the selected response values. See Also -------- :func:`partial_X` : Generate the corresponding predictor matrix. :func:`prepare_predictors` : Build X and y together from raster bands. """ if not isinstance(response, Band): response = Band(source=Source(path=response), bidx=1) if window is not None: _selector = selector[window.toslices()] else: _selector = selector with response.data_reader(mode='r') as read: response_data = read(window=window) return response_data[_selector]
[docs] def partial_X(predictors: Collection[Band], window: Window | None, selector: NDArray, include_intercept: bool, as_dtype: type | str): """ Generate a (partial) predictor matrix :math:`X`. This function constructs the predictor matrix from a collection of raster bands, optionally restricted to a spatial window. A boolean selector is applied to include only valid pixels. The function is intended to support partial or chunked processing of predictors, for example in parallelized workflows. Parameters ---------- predictors : Collection[Band] Collection of ``Band`` objects defining the predictor variables. Individual bands may represent continuous or categorical predictors. window : rasterio.windows.Window or None Spatial window defining the subset of predictor data to read. If ``None``, the full spatial extent is used. Notes ----- This argument is primarily intended to enable partial processing of predictors, e.g. in parallel or tiled computations. selector : NDArray of bool Boolean array used to select valid pixels from the predictor data. include_intercept : bool If ``True``, an additional column of ones is appended to the predictor matrix to model an intercept term. as_dtype : str or type Data type used for the resulting predictor matrix. This is also the type used to represent categorical predictors, if present. Returns ------- X : NDArray of shape (n_samples, n_features) Predictor matrix containing the selected predictor values. The number of features corresponds to the number of predictors, plus one if ``include_intercept`` is ``True``. See Also -------- :func:`init_X` : Allocate the empty predictor matrix. :func:`populate_X` : Fill the predictor matrix with data. :func:`transposed_product` : Compute X.T @ X using this function internally. """ X = init_X(predictors, selector=selector, window=window, include_intercept=include_intercept, as_dtype=as_dtype, ) populate_X(X=X, predictors=predictors, window=window, selector=selector, include_intercept=include_intercept, as_dtype=as_dtype) return X
[docs] def get_optimal_weights_source(Y: NDArray, response: str | Band, predictors: Collection[Band], view: tuple[int, int, int, int] | None, selector, include_intercept: bool = False, as_dtype="float64") -> dict[Band, float]: """ Compute optimal regression weights directly from predictors and a precomputed inverse transposed product. The multiple linear regression model is defined as .. math:: y = X\\beta + \\epsilon The least-squares solution for the regression weights is .. math:: \\hat{\\beta} = (X^T X)^{-1} X^T y Defining .. math:: Y = (X^T X)^{-1} this function computes .. math:: \\hat{\\beta} = Y X^T y directly from the predictor data and the response values, without explicitly recomputing :math:`X^T X`. Parameters ---------- Y : NDArray of shape (n_features, n_features) Inverse of the transposed product of the predictor matrix, :math:`(X^T X)^{-1}`. Notes ----- Typically, ``Y`` is obtained by inverting the output of :func:`transposed_product` via :func:`numpy.linalg.inv`. response : str or Band Response variable specified either as a ``Band`` object or as the path to a raster file (``.tif``). If a string is provided, the raster must contain a single band. predictors : Collection[Band] Collection of ``Band`` objects defining the predictor variables. view : tuple of int or None Spatial subset specified as ``(x, y, width, height)``. If ``None``, the full spatial extent is used. selector : NDArray of bool Boolean array indicating which pixels are valid and should be included in the regression. include_intercept : bool, default=False If ``True``, include an intercept term in the regression. The intercept weight is returned under the key ``'intercept'``. as_dtype : str or type, default="float64" Data type used for constructing the predictor matrix. Returns ------- weights : dict Dictionary mapping each predictor to its fitted regression weight. If ``include_intercept`` is ``True``, an additional key ``'intercept'`` is included. See Also -------- :func:`transposed_product` : Compute X.T @ X, whose inverse is ``Y``. :func:`get_optimal_weights` : Direct computation from X and y. :func:`partial_X` : Generate the partial predictor matrix used here. """ riow = view_to_window(view) # First part_X = partial_X(predictors=predictors, window=riow, selector=selector, include_intercept=include_intercept, as_dtype=as_dtype) part_y = partial_response(response=response, window=riow, selector=selector) betas = Y @ part_X.T @ part_y if include_intercept: pred_list = list(predictors) pred_list.append('intercept') predictors = tuple(pred_list) if len(betas) != len(predictors): raise ValueError(f"Number of predictors {len(predictors)} not equal with number of fitted values {len(betas)}") return {pred: beta for pred, beta in zip(predictors, betas)}
[docs] def get_approx_weights(X: NDArray, y: NDArray, fit_intercept: bool = False) -> LinearRegression: """ Estimate regression weights using numerical optimization. This function fits a multiple linear regression model of the form .. math:: y = X\\beta + \\epsilon where :math:`X` is the predictor matrix, :math:`y` the response vector, :math:`\\beta` the vector of regression weights, and :math:`\\epsilon` a random error term. The weights are estimated using scikit-learn’s :class:`sklearn.linear_model.LinearRegression` estimator, which computes a least-squares solution using numerical linear algebra routines. Parameters ---------- X : NDArray of shape (n_samples, n_features) Predictor matrix where each row corresponds to an observation and each column to a predictor variable. y : NDArray of shape (n_samples,) Response vector. fit_intercept : bool, default=False Whether to fit an intercept term. Notes ----- If ``X`` was constructed using :func:`prepare_predictors` with ``include_intercept=True``, then ``fit_intercept`` should be set to ``False`` to avoid fitting the intercept twice. Returns ------- regression : :class:`sklearn.linear_model.LinearRegression` Fitted linear regression model. See Also -------- :func:`get_optimal_weights` : Analytic least-squares solution (normal equations). :func:`prepare_predictors` : Build X and y from raster bands. """ regression = LinearRegression(fit_intercept=fit_intercept) regression.fit(X, y) return regression