"""
High-level parallelized workflows for spatial multiple linear regression.
This module exposes the main user-facing functions of the ``coonfit`` package.
All computations are distributed across worker processes and operate on raster
data block by block, making them suitable for large spatial datasets that do
not fit in memory.
Key functions:
- :func:`compute_weights` — Full end-to-end workflow: builds the selector mask,
validates predictors, computes :math:`X^T X`, checks for linear dependencies,
inverts the matrix, and returns the optimal regression weights.
- :func:`compute_model` — Applies fitted weights to predictor rasters and writes
the model prediction to a new GeoTIFF.
- :func:`get_XT_X` — Parallelized computation of the transposed product
:math:`X^T X` across spatial blocks.
- :func:`get_optimal_betas` — Parallelized computation of regression coefficients
given a pre-inverted :math:`(X^T X)^{-1}`.
- :func:`get_XT_X_dependency` — Checks for linear dependencies among predictors
without running the full fitting pipeline.
- :func:`calculate_rmse` — Computes Root Mean Square Error (RMSE) between the model and
observed response.
- :func:`calculate_r2` — Computes the coefficient of determination (:math:`R^2`).
"""
from __future__ import annotations
from collections.abc import Collection
import warnings
from typing import Union
import numpy as np
from multiprocessing import Manager
from numpy.typing import NDArray
from riogrande.io import Source, Band
from riogrande.helper import (
view_to_window,
check_compatibility,
get_or_set_context,
get_nbr_workers,
)
from riogrande.prepare import (
create_views,
)
from riogrande import parallel as rgpara
from .helper import check_rank_deficiency
from . import parallel_helpers as lph
[docs]
def compute_model(predictors: Collection[Band],
optimal_weights: dict[dict] | dict | None,
output_file: str,
block_size: tuple[int, int],
predictors_as_dtype: str | type | None = None,
profile: dict | None = None,
selector: NDArray[np.bool_] | None = None,
selector_band: Band | None = None,
verbose: bool = False,
**params) -> str:
"""
Create a ``.tif`` file with the model prediction values from a fitted model.
Parameters
----------
predictors : Collection of Band
Collection of predictor bands used in the multiple linear regression.
optimal_weights : dict of dict, dict, or None
Holding for each predictor the optimal weight.
If weights include a key named "intercept",
this will be used as the intercept (beta0) for the model prediction.
If a `selector_band` is provided, then it must hold for each
categorical value (key) a dictionary with the optimal weights per
predictor.
See the `selector_band` parameter for more details.
output_file : str
Path to where the model result should be written to.
block_size : tuple of int
Size (width, height) in pixels of the block that a single job
processes.
predictors_as_dtype : str, type, or None, optional
Datatype to convert predictor input to (e.g. np.float32) prior to
computing their contribution.
.. note::
Only a type conversion is supported prior to computing the predictors
contribution. Rescaling of a predictor needs to happen in a separate
step beforehand.
profile : dict or None, optional
The profile to use for the newly created output tif.
By default the profile is copied from the first source of the
predictor bands, updating the count to 1.
selector : NDArray of bool or None, optional
A selector array to use to selectively calculate the model prediction.
If a boolean array is provided then it is applied as an (inverted) mask:
only pixels that result to `True` are calculated.
If a categorical array (np.uint8) is provided instead, then it is assumed
that the `optimal_weights` use it as a selector for (masking) the
processing of the model.
selector_band : Band or None, optional
A band object with categorical data.
If provided, the `optimal_weights` needs to hold for each of the category
a dictionary with predictor specific weights.
verbose : bool, optional
Print out processing steps. Default is False.
**params : dict
Optional arguments for the multiprocessing:
- nbrcpu : int, optional
Number of CPUs to use for parallel processing.
- start_method : str, optional
Multiprocessing start method ('fork', 'spawn', or 'forkserver').
- compress : bool, optional
If True, apply LZW compression to the final output file.
Returns
-------
output_file : str
Path to the newly created tif file holding the model prediction data.
If compression is enabled, this will be the path to the compressed file.
Notes
-----
The function uses multiprocessing to process the image in blocks for
improved performance. Each block is processed independently by
:func:`~coonfit.parallel_helpers._block_model_prediction` and results
are aggregated by :func:`~riogrande.parallel.combine_views` into the
final output file. Block decomposition uses :func:`~riogrande.prepare.create_views`.
Worker pool is sized via :func:`~riogrande.helper.get_nbr_workers`
and created with :func:`~riogrande.helper.get_or_set_context`.
Timing information for each job and the total duration is printed to
standard output upon completion.
See Also
--------
:func:`compute_weights` : Compute the optimal weights used as input here.
:func:`calculate_rmse` : Evaluate the model quality after calling this function.
:func:`calculate_r2` : Compute R² for the model output.
"""
# get all source files
sources = tuple(set(pred.source.path for pred in predictors))
check_compatibility(*sources)
# Handle the output profile
if profile is None:
profile = Source(path=sources[0]).import_profile()
profile['count'] = 1
# make sure to get the data output type from the profile
as_dtype = profile.get('dtype')
# Note: with profile one could provide invalid dimensions
height = profile['height']
width = profile['width']
out_band = None # cerate a new band bidx=1
tags = dict(category='model')
# Parameter for the aggregation task
combine_params = dict(
profile=profile,
output_file=output_file,
band=out_band,
tags=tags
)
# Parameter for the individual jobs
_, inner_views = create_views(view_size=block_size,
border=(0, 0),
size=(width, height))
block_params = []
for view in inner_views:
bparams = dict(view=view,
predictors=predictors,
predictors_as_dtype=predictors_as_dtype,
selector=selector,
selector_band=selector_band,
optimal_weights=optimal_weights,
as_dtype=as_dtype,
)
block_params.append(bparams)
# ###
# prepare multiprocessing
# ###
manager = Manager()
job_out_q = manager.Queue()
start_method = params.get('start_method', None)
# get number of workers
nbr_workers = get_nbr_workers(number=params.pop('nbrcpu', None))
if verbose:
print(f"using {nbr_workers=}")
with get_or_set_context(start_method).Pool(nbr_workers) as pool:
# start the aggregator task
combiner_job = pool.apply_async(rgpara.combine_views,
(combine_params, job_out_q))
# start the block processing
all_jobs = []
for bparams in block_params:
all_jobs.append(pool.apply_async(lph._block_model_prediction,
(bparams, job_out_q)))
# collect results
job_timers = []
for job in all_jobs:
# await for the jobs to return (i.e. complete) by calling .get
# get the duration from the timer object that is returned by .get()
job_timers.append(job.get().get_duration())
# once we have all the blocks, add a last element to the queue to stop
# the combination process
job_out_q.put(dict(signal='kill'))
pool.close()
# wait for the *_combiner tasks to finish
pool.join()
# lzw-compress final output
compress = params.pop('compress', False)
if compress:
out_source = Source(output_file)
out_source.compress(output=None)
output_file = str(out_source.path)
print("Files compressed successfully")
total_duration = combiner_job.get().get_duration()
print(f"{total_duration=}")
print(f"maximal duration of single job: {max(job_timers)=}")
return output_file
[docs]
def get_XT_X(response: str | Band,
*predictors: Band | str,
selector: NDArray,
include_intercept: bool = True,
verbose: bool = False,
**mpc_params
) -> np.ndarray:
"""
Calculate :math:`X^T X` matrix in parallel from predictor data blocks.
This function computes the transpose-product matrix :math:`X^T X` used in
linear regression by processing the predictor data in parallel blocks.
The response parameter is only used to determine the spatial dimensions
of the computation.
Parameters
----------
response : str or Band
Path to a tif file or Band object containing the response data.
Only the spatial dimensions (width, height) are used; the actual
response values are not required for this computation.
*predictors : Band or str
Variable number of predictor bands to include in the design matrix X.
Can be Band objects or paths to source files.
See :func:`~coonfit.inference.prepare_predictors` for details on predictor
specification.
selector : NDArray
Boolean array (:class:`numpy.bool_`) to select usable cells. Must have the same
spatial dimensions as the response. True values indicate pixels to
include in the computation.
include_intercept : bool, optional
If True, append a column of ones to the design matrix X to fit an
intercept term. Default is True.
verbose : bool, optional
If True, print runtime information including number of workers used.
Default is False.
**mpc_params : dict
Multiprocessing configuration parameters.
Required:
- view_size : tuple of int
Size (width, height) in pixels of a single view/block to process.
Optional:
- nbrcpu : int, optional
Number of CPUs to use. If not set, uses (available threads - 1).
- start_method : str, optional
Starting method for multiprocessing ('fork', 'spawn', or
'forkserver').
Returns
-------
XT_X : NDArray
The transpose-product matrix :math:`X^T X` of shape (n_predictors, n_predictors).
If `include_intercept=True`, the shape is (n_predictors+1, n_predictors+1)
with the intercept column included as the last row and column.
Notes
-----
The function implements parallel computation by:
1. Dividing the spatial domain into non-overlapping blocks via
:func:`~riogrande.prepare.create_views`
2. Computing partial :math:`X^T X` matrices for each block in parallel via
:func:`~coonfit.parallel_helpers._partial_transposed_product`
3. Aggregating the partial results via
:func:`~coonfit.parallel_helpers._combine_matrices`
This approach is memory-efficient for large spatial datasets as it avoids
loading the entire design matrix into memory at once.
The selector mask is applied consistently across all blocks to ensure
only valid pixels contribute to the computation.
See Also
--------
:func:`get_optimal_betas` : Compute beta coefficients from ``X.T @ X``.
:func:`compute_weights` : Full workflow wrapping this function.
"""
print(f'get_XT_X - {response=}, {predictors=}')
if not isinstance(response, Band):
response = Band(source=Source(path=response),
bidx=1)
start_method = mpc_params.get('start_method', None)
view_size = mpc_params.get('view_size')
nbr_workers = get_nbr_workers(number=mpc_params.get('nbrcpu', None))
src_profile = response.source.import_profile()
src_width = int(src_profile.get('width'))
src_height = int(src_profile.get('height'))
# create a list of views and put it into runner_params
size = (src_width, src_height)
border = (0, 0)
# _ is for the inner_views which we do not need
views, _ = create_views(view_size=view_size,
border=border,
size=size)
part_params = []
for view in views:
pparams = dict(predictors=predictors,
view=view,
selector=selector,
include_intercept=include_intercept)
part_params.append(pparams)
# create the arguments for the aggregation script
# start the processes
manager = Manager()
output_q = manager.Queue()
if verbose:
print(f"using {nbr_workers=}")
with get_or_set_context(start_method).Pool(nbr_workers) as pool:
# start the aggregation step
matrix_aggregator = pool.apply_async(
lph._combine_matrices,
(output_q,)
)
all_jobs = []
for pparams in part_params:
all_jobs.append(pool.apply_async(
lph._partial_transposed_product,
(pparams, output_q)
))
# now lets wait for all of these jobs to finish
job_timers = []
for job in all_jobs:
# await for the jobs to return (i.e. complete) by calling .get
# get the duration from the timer object that is returned by .get()
job_timers.append(job.get())
# send the final kill job to the queue
output_q.put(dict(signal='kill'))
# wait for the recombination job to terminate
recombined_tpX, _ = matrix_aggregator.get()
return recombined_tpX
[docs]
def get_optimal_betas(*predictors: Band | str,
Y: np.ndarray,
response: str | Band,
selector,
include_intercept=True,
verbose: bool = False,
as_dtype=np.float64,
**mpc_params
) -> dict[Band | str, float]:
"""
Calculate optimal regression coefficients (:math:`\\beta`) in parallel from spatial data.
This function computes the optimal weights ((:math:`\\beta`) coefficients) for a
multiple linear regression by processing predictor data in parallel blocks.
The computation solves for beta in the normal equation (ordinary least squares problem):
.. math::
\\beta = (X^T X)^{-1} X^T y
where :math:`X` is the design matrix of predictors and :math:`y` is the response vector.
This approach is memory-efficient for large spatial datasets as it avoids
loading the entire design matrix into memory at once.
Parameters
----------
*predictors : Band or str
Variable number of predictor bands to include in the regression.
Can be Band objects or paths to source files.
See :func:`~coonfit.inference.prepare_predictors` for details on predictor
specification.
Y : NDArray
Pre-inverted transpose-product matrix, :math:`(X^T X)^{-1}`, of shape
(n_predictors, n_predictors). Typically obtained via
``numpy.linalg.inv(get_XT_X(...))``.
Shape is (n_predictors+1, n_predictors+1) if `include_intercept=True`.
response : str or Band
Path to a tif file or Band object containing the response data.
Used to determine spatial dimensions for block processing.
selector : NDArray
Boolean array (:class:`numpy.bool_`) to select usable cells. Must have the same
spatial dimensions as the response. True values indicate pixels to
include in the regression.
include_intercept : bool, optional
If True, fit an intercept term by appending a column of ones to the
design matrix X. The intercept will be included in the returned
dictionary with key 'intercept'. Default is True.
verbose : bool, optional
If True, print runtime information including number of workers,
predictors, and computed beta values. Default is False.
as_dtype : dtype, optional
Data type to use for internal computations. Default is np.float64.
**mpc_params : dict
Multiprocessing configuration parameters.
Required:
- view_size : tuple of int
Size (width, height) in pixels of a single view/block to process.
Optional:
- nbrcpu : int, optional
Number of CPUs to use. If not set, uses (available threads - 1).
- start_method : str, optional
Starting method for multiprocessing ('fork', 'spawn', or
'forkserver').
Returns
-------
optimal_weights : dict of {Band or str: float}
Dictionary mapping each predictor to its optimal regression coefficient.
If `include_intercept=True`, includes an additional entry with key
'intercept' for the intercept term (beta_0).
Raises
------
ValueError
If the number of computed beta values does not match the number of
predictors (plus intercept if applicable).
Notes
-----
The function implements parallel computation by:
1. Dividing the spatial domain into non-overlapping blocks via
:func:`~riogrande.prepare.create_views`
2. Computing partial contributions for each block via
:func:`~coonfit.parallel_helpers._partial_optimal_betas`
3. Aggregating the partial results via
:func:`~coonfit.parallel_helpers._combine_matrices`
See Also
--------
:func:`get_XT_X` : Compute ``X.T @ X``, whose inverse is passed as ``Y``.
:func:`compute_weights` : Full workflow wrapping this function.
Examples
--------
>>> # Compute optimal betas with intercept
>>> selector = np.ones((1000, 1000), dtype=bool)
>>> Y = np.linalg.inv(XT_X) # Pre-inverted (X.T @ X)^{-1}
>>> weights = get_optimal_betas(
... band1, band2, band3,
... Y=Y,
... response='response.tif',
... selector=selector,
... include_intercept=True,
... view_size=(512, 512),
... nbrcpu=4
... )
>>> weights
{<Band: band1>: 0.523, <Band: band2>: 1.245, <Band: band3>: -0.334, 'intercept': 5.12}
>>> # Without intercept
>>> Y = np.array([10.5, 20.3, 15.7])
>>> weights = get_optimal_betas(
... band1, band2, band3,
... Y=Y,
... response='response.tif',
... selector=selector,
... include_intercept=False,
... view_size=(512, 512)
... )
>>> weights
{<Band: band1>: 0.523, <Band: band2>: 1.245, <Band: band3>: -0.334}
"""
print(f'get_optimal_betas - {response=}, {predictors=}')
if not isinstance(response, Band):
response = Band(source=Source(path=response),
bidx=1)
start_method = mpc_params.get('start_method', None)
view_size = mpc_params.get('view_size')
nbr_workers = get_nbr_workers(number=mpc_params.get('nbrcpu', None))
src_profile = response.source.import_profile()
src_width = int(src_profile.get('width'))
src_height = int(src_profile.get('height'))
size = (src_width, src_height)
border = (0, 0)
# _ is for the inner_views which we do not need
views, _ = create_views(view_size=view_size,
border=border,
size=size)
# define the parameters for each job
part_params = []
for view in views:
pparams = dict(Y=Y,
response=response,
predictors=predictors,
view=view,
selector=selector,
include_intercept=include_intercept,
as_dtype=as_dtype
)
part_params.append(pparams)
# create the arguments for the aggregation script
# start the processes
manager = Manager()
output_q = manager.Queue()
if verbose:
print(f"using {nbr_workers=}")
with get_or_set_context(start_method).Pool(nbr_workers) as pool:
# start the aggregation step
matrix_aggregator = pool.apply_async(
lph._combine_matrices,
(output_q,)
)
all_jobs = []
for pparams in part_params:
all_jobs.append(pool.apply_async(
lph._partial_optimal_betas,
(pparams, output_q)
))
# now lets wait for all of these jobs to finish
job_timers = []
for job in all_jobs:
# await for the jobs to return (i.e. complete) by calling .get
# get the duration from the timer object that is returned by .get()
job_timers.append(job.get())
# send the final kill job to the queue
output_q.put(dict(signal='kill'))
# wait for the recombination job to terminate
betas, _ = matrix_aggregator.get()
# append predictor if intercept set
if include_intercept:
pred_list = list(predictors)
pred_list.append('intercept')
predictors = tuple(pred_list)
if len(betas) != len(predictors):
raise ValueError(f"Number of predictors {len(predictors)} not equal with number of fitted values {len(betas)}")
if verbose:
print(f"{predictors=}")
print(f"{betas=}")
return {pred: beta for pred, beta in zip(predictors, betas)}
[docs]
def get_XT_X_dependency(response: str | Band,
predictors: Collection[Band],
block_size: Union[dict[str, tuple[int, int]], tuple[int, int]],
include_intercept: bool = True,
limit_contribution: float = 0.0,
no_data: Union[int, float] = 0.0,
sanitize_predictors: bool = False,
verbose: bool = False,
**params
) -> dict[Band, str]:
"""Test predictors for linear dependency before fitting multiple linear regression.
This function checks whether predictor columns are linearly dependent by
computing the :math:`X^T X` matrix and analyzing its rank. Linear dependencies
can cause numerical instability or singularity in regression fitting and
should be resolved before proceeding with model estimation.
Parameters
----------
response : str or Band
Path to a tif file or Band object containing the response data.
Used to determine spatial dimensions and create the selector mask.
predictors : Collection of Band
Collection of predictor bands to test for linear dependency.
See :func:`~coonfit.inference.prepare_predictors` for details on predictor
specification.
block_size : tuple of int or dict of {str: tuple of int}
Block sizes for parallel processing functions.
- If tuple: (width, height) in pixels, applied to all processing steps.
- If dict: Maps function names to specific block sizes with keys:
'prepare_selector', 'get_XT_X'. Each value should be a tuple
(width, height).
include_intercept : bool, optional
If True, include an intercept term when computing ``X.T @ X``.
Default is True.
limit_contribution : float, optional
Minimum fraction of valid cells each predictor must contribute to
be considered valid. Value between 0.0 and 1.0.
Default is 0.0 (a single valid value is sufficient).
no_data : int or float, optional
Value representing invalid or missing data. Cells with this value
are excluded from analysis. Default is 0.0.
sanitize_predictors : bool, optional
If True, automatically remove predictors that fail the contribution
threshold. If False, raise an exception when invalid predictors are
found. Default is False.
verbose : bool, optional
If True, print processing step information. Default is False.
**params : dict
Optional multiprocessing arguments:
- nbrcpu : int, optional
Number of CPUs to use. If not set, uses (available CPUs - 1).
- start_method : str, optional
Process start method: 'spawn', 'fork', or 'forkserver'.
Returns
-------
dependencies : dict of {Band: str} or None
Dictionary mapping each rank-deficient predictor to a description
of its linear dependency. Returns None if the selector masks all
pixels (no data to fit).
If no linear dependencies are found, returns an empty dictionary.
Rank deficiency is checked using :func:`~coonfit.helper.check_rank_deficiency`.
See Also
--------
:func:`compute_weights` : Full fitting workflow using this dependency check.
:func:`get_XT_X` : Compute the ``X.T @ X`` matrix analyzed here.
"""
# if block sizes are provided as dictionary - some pre-check on input is desired - else
block_size_params = dict(prepare_selector=None, get_XT_X=None)
if isinstance(block_size, tuple):
for key in block_size_params:
block_size_params[key] = block_size
if isinstance(block_size, dict):
if block_size_params.keys() != block_size.keys() or not all(isinstance(v, tuple) for v in block_size.values()):
raise ValueError(f"Block size dict does not conform with all necessary keys and value-types: {block_size=}")
block_size_params.update(block_size)
if not isinstance(response, Band):
response = Band(source=Source(path=response),
bidx=1)
print("Creating selector...")
selector = rgpara.prepare_selector(
response,
*predictors,
block_size=block_size_params["prepare_selector"],
verbose=verbose,
**params
)
# If selector is empty (meaning all is FALSE) - no need to proceed
_vals = np.unique(selector)
if len(_vals) == 1:
if _vals[0] == False: # if not _vals[0] is less explicit
print(f"WARNING: no pixels to fit, selector masks all pixels")
return None
print("Check consistency of remaining predictor data...")
nbr_predictors = len(predictors)
predictors = lph._check_predictor_consistency(predictors,
selector=selector,
tolerance=limit_contribution,
sanitize=sanitize_predictors,
no_data=no_data,
verbose=verbose,
**params)
if len(predictors) != nbr_predictors:
# for details here: see compute_weights
selector = rgpara.prepare_selector(
response,
*predictors,
block_size=block_size_params["prepare_selector"],
verbose=verbose,
**params
)
print("Calculate X.T @ X...")
tpX = get_XT_X(response,
*predictors,
selector=selector,
include_intercept=include_intercept,
verbose=verbose,
view_size=block_size_params["get_XT_X"],
**params)
print("Check linear dependency...")
rank_def_cols = check_rank_deficiency(tpX)
return {predictors[k]: v for k, v in rank_def_cols.items()}
[docs]
def compute_weights(response: str | Band,
predictors: Collection[Band],
block_size: Union[dict[str, tuple[int, int]], tuple[int, int]],
include_intercept: bool = True,
as_dtype=np.float64,
limit_contribution: float = 0.0,
no_data: Union[int, float] = 0.0,
sanitize_predictors: bool = False,
return_linear_dependent_predictors: bool = False,
verbose: bool = False,
**params
) -> dict[Band, float] | dict[Band, str] | None:
"""Compute optimal regression coefficients for multiple linear regression.
This function fits a multiple linear regression model by computing the
optimal weights (beta coefficients) for each predictor. The computation
involves creating a selector mask, validating predictor consistency,
calculating the :math:`X^T X` matrix, checking for linear dependencies, and
solving the normal equations.
Parameters
----------
response : str or Band
Path to a tif file or Band object containing the response data.
Used to determine spatial dimensions and create the selector mask.
predictors : Collection of Band
Collection of predictor bands to use in the regression.
See :func:`~coonfit.inference.prepare_predictors` for details on predictor
specification.
block_size : tuple of int or dict of {str: tuple of int}
Block sizes for parallel processing functions.
- If tuple: (width, height) in pixels, applied to all processing steps.
- If dict: Maps function names to specific block sizes with keys:
'prepare_selector', 'get_XT_X', 'get_optimal_betas'. Each value
should be a tuple (width, height).
include_intercept : bool, optional
If True, fit an intercept term in the regression model.
Default is True.
as_dtype : dtype, optional
Data type for internal computations and output weights.
Default is np.float64.
limit_contribution : float, optional
Minimum fraction of valid cells each predictor must contribute to
be considered valid. Value between 0.0 and 1.0.
Default is 0.0 (a single valid value is sufficient).
no_data : int or float, optional
Value representing invalid or missing data. Cells with this value
are excluded from analysis. Default is 0.0.
sanitize_predictors : bool, optional
If True, automatically remove predictors that fail the contribution
threshold. If False, raise an exception when invalid predictors are
found. Default is False.
return_linear_dependent_predictors : bool, optional
If True and linear dependencies are detected, return a dictionary
mapping linearly dependent predictors to their dependency type instead
of computing weights. This stops the fitting process.
If False and dependencies are detected, raise an error.
Default is False.
verbose : bool, optional
If True, print processing step information. Default is False.
**params : dict
Optional arguments:
- extra_masking_band : Band or None, optional
Additional Band object to use directly as a mask. All cells with
value 0 are masked.
- nbrcpu : int, optional
Number of CPUs to use. If not set, uses (available CPUs - 1).
- start_method : str, optional
Process start method: 'spawn', 'fork', or 'forkserver'.
Returns
-------
optimal_weights : dict of {Band or str: float} or None
Dictionary mapping each predictor (and 'intercept' if
`include_intercept=True`) to its optimal regression coefficient.
Returns None if the selector masks all pixels or if linear
dependencies are detected and `return_linear_dependent_predictors=False`.
linear_dependencies : dict of {Band: str}
Only returned if `return_linear_dependent_predictors=True` and
linear dependencies are detected. Maps each linearly dependent
predictor to a description of its dependency type.
Raises
------
ValueError
If `block_size` is a dictionary but doesn't contain the required keys
or has invalid value types.
:exc:`~coonfit.exceptions.InvalidPredictorError`
If `sanitize_predictors=False` and predictors fail the contribution
threshold.
:exc:`numpy.linalg.LinAlgError`
If the X.T @ X matrix is singular and cannot be inverted (when
`return_linear_dependent_predictors=False`).
Warnings
--------
UserWarning
Issued when the selector masks all pixels (no data to fit).
UserWarning
Issued when linear dependencies are detected.
Notes
-----
The function performs the following workflow:
1. Creates a selector mask via :func:`~riogrande.parallel.prepare_selector`
2. Validates predictor consistency via
:func:`~coonfit.parallel_helpers._check_predictor_consistency`
(removes invalid predictors if `sanitize_predictors=True`)
3. Recalculates selector if predictors were removed
4. Computes :math:`X^T X` matrix in parallel via :func:`get_XT_X`
5. Checks for rank deficiency via :func:`~coonfit.helper.check_rank_deficiency`
6. Inverts :math:`X^T X` via :func:`numpy.linalg.inv` to obtain :math:`(X^T X)^{-1}`
7. Computes optimal weights via :func:`get_optimal_betas`
The optimal weights solve the ordinary least squares problem:
.. math::
\\beta = (X^T X)^{-1} X^T y
where :math:`X` is the design matrix of predictors and :math:`y` is the response vector.
Linear dependency detection excludes the intercept column from the rank
check, as the intercept is always included as the last column when
`include_intercept=True`.
If predictors are removed during sanitization, the selector is recomputed
to ensure consistency, as removed predictors may have contributed to
masking certain pixels.
See Also
--------
:func:`compute_model` : Apply the fitted weights to produce a model raster.
:func:`get_XT_X` : Compute ``X.T @ X`` used in the normal equations.
:func:`get_optimal_betas` : Compute regression coefficients given ``Y = (X.T @ X)^{-1}``.
:func:`calculate_rmse` : Evaluate model fit using RMSE.
:func:`get_XT_X_dependency` : Check for linear dependencies without full fitting.
Examples
--------
>>> # Basic usage with uniform block size
>>> predictors = [band1, band2, band3]
>>> weights = compute_weights(
... response='response.tif',
... predictors=predictors,
... block_size=(512, 512),
... include_intercept=True,
... verbose=True
... )
>>> weights
{<Band: band1>: 0.523, <Band: band2>: 1.245, <Band: band3>: -0.334, 'intercept': 5.12}
>>> # Use different block sizes for different steps
>>> block_sizes = {
... 'prepare_selector': (1024, 1024),
... 'get_XT_X': (512, 512),
... 'get_optimal_betas': (256, 256)
... }
>>> weights = compute_weights(
... response='response.tif',
... predictors=predictors,
... block_size=block_sizes,
... limit_contribution=0.05,
... sanitize_predictors=True,
... nbrcpu=4
... )
>>> # Detect and return linear dependencies
>>> deps = compute_weights(
... response='response.tif',
... predictors=predictors,
... block_size=(512, 512),
... return_linear_dependent_predictors=True
... )
>>> if isinstance(deps, dict) and all(isinstance(v, str) for v in deps.values()):
... print(f"Linear dependencies found: {deps}")
... else:
... print(f"Optimal weights: {deps}")
"""
# if block sizes are provided as dictionary - some pre-check on input is desired - else
block_size_params = dict(prepare_selector=None, get_XT_X=None, get_optimal_betas=None)
if isinstance(block_size, tuple):
for key in block_size_params:
block_size_params[key] = block_size
if isinstance(block_size, dict):
if block_size_params.keys() != block_size.keys() or not all(isinstance(v, tuple) for v in block_size.values()):
raise ValueError(f"Block size dict does not conform with all necessary keys and value-types: {block_size=}")
block_size_params.update(block_size)
if not isinstance(response, Band):
response = Band(source=Source(path=response),
bidx=1)
print("Creating selector...")
extra_masking_band = params.pop("extra_masking_band", None)
selector = rgpara.prepare_selector(
response,
*predictors,
block_size=block_size_params["prepare_selector"],
verbose=verbose,
extra_masking_band=extra_masking_band,
**params
)
# If selector is empty (meaning all is FALSE) - no need to proceed
_vals = np.unique(selector)
if len(_vals) == 1:
if _vals[0] == False: # if not _vals[0] is less explicit
warnings.warn("No pixels to fit - selector masks all pixels", UserWarning)
return None
print("Check consistency of remaining predictor data...")
nbr_predictors = len(predictors)
predictors = lph._check_predictor_consistency(predictors,
selector=selector,
tolerance=limit_contribution,
sanitize=sanitize_predictors,
no_data=no_data,
verbose=verbose,
**params)
if len(predictors) != nbr_predictors:
# the consistency check removed some predictors
# we re-create the selector in this case since the dropped out
# predictor(s) might have masked some cells
selector = rgpara.prepare_selector(
response,
*predictors,
block_size=block_size_params["prepare_selector"],
verbose=verbose,
extra_masking_band=extra_masking_band,
**params
)
# NOTE: We do not need to _check_predictor_consistency again since
# removing the predictor leads to at least the same valid
# pixels, if not more.
print("Calculate X.T @ X...")
tpX = get_XT_X(response,
*predictors,
selector=selector,
include_intercept=include_intercept,
verbose=verbose,
view_size=block_size_params["get_XT_X"],
**params)
print("Check linear dependency...")
# Check rank deficiency of matrix
_check_tpX = tpX.copy()
if include_intercept:
# if intercept fitted
# we don't want to check it for lin dependency (its the last column always - see XT X)
_check_tpX = _check_tpX[:, :-1]
rank_def = check_rank_deficiency(_check_tpX)
if rank_def:
linear_dependent_predictors = {predictors[k]: v for k, v in rank_def.items()}
if return_linear_dependent_predictors:
print(f"WARNING: Rank deficiency detected returning affected predictors")
return linear_dependent_predictors
else:
warnings.warn(f"Matrix not invertible - rank deficiency detected. "
f"Linear dependent predictors: {linear_dependent_predictors}", UserWarning)
return None
print("Inverting X.T @ X...")
Y = np.linalg.inv(tpX)
# print(f"{tpX=}\n{Y=}")
# print("#####\n#####\n#####")
print("Calculate Y @ X.T @ y (optimal weights)...")
betas_dict = get_optimal_betas(*predictors,
Y=Y,
response=response,
selector=selector,
include_intercept=include_intercept,
verbose=verbose,
as_dtype=as_dtype,
view_size=block_size_params["get_optimal_betas"],
**params)
return betas_dict
[docs]
def calculate_rmse(response: str | Band,
model: str | Band,
selector: NDArray[np.bool_],
block_size: Union[dict[str, tuple[int, int]], tuple[int, int]],
verbose: bool = False,
**params) -> float:
"""
Compute the Root Mean Square Error (RMSE) for a predicted model and observed response data.
RMSE measures the average magnitude of the residuals between predicted and
observed values and is defined as:
.. math::
\\text{RMSE} = \\sqrt{\\frac{\\sum_{i=1}^{n}(\\hat{y}_i - y_i)^2}{n}}
where :math:`\\hat{y}_i` are model-predicted values, :math:`y_i` are observed
response values, and :math:`n` is the number of valid observations.
The function processes data in blocks for memory efficiency and parallelization.
Parameters
----------
response : Band or str
A `Band` object or a path to a raster/tif file containing the observed response data.
model : Band or str
A `Band` object or a path to a raster/tif file containing predicted values from the model.
selector : NDArray
Boolean mask (:class:`numpy.bool_`) specifying which data points should be included in the RMSE calculation.
Points where `selector` is False are ignored.
block_size : tuple[int, int] or dict[str, tuple[int, int]]
Size of the block (width, height) in pixels for processing data chunks.
If a dictionary, it should contain named blocks.
verbose : bool, default=False
If True, prints status information during computation.
**params : dict
Additional parameters for parallel processing:
- `nbr_cpus` (int): Number of CPUs to use (default: all available minus one).
- `start_method` (str): Multiprocessing start method ('spawn', 'fork', or 'forkserver').
Returns
-------
float
The RMSE value. Lower values indicate better model fit.
Computed via :func:`~coonfit.parallel_helpers._block_ssr`.
See Also
--------
:func:`calculate_r2` : Compute the coefficient of determination (R²).
:func:`compute_weights` : Fit the regression weights used to generate the model.
Examples
--------
>>> response_band = Band(source=Source(path="response.tif"), bidx=1)
>>> model_band = Band(source=Source(path="model.tif"), bidx=1)
>>> selector_mask = np.ones((height, width), dtype=bool)
>>> rmse = calculate_rmse(response_band, model_band, selector_mask, block_size=(512, 512))
>>> print(f"RMSE = {rmse:.3f}")
"""
if not isinstance(response, Band):
response = Band(source=Source(path=response), bidx=1)
if not isinstance(model, Band):
model = Band(source=Source(path=model), bidx=1)
# Check compatibility
response.source.check_compatibility(model.source)
src_profile = response.source.import_profile()
width = int(src_profile.get('width'))
height = int(src_profile.get('height'))
# Parameter for the individual jobs
_, inner_views = create_views(view_size=block_size,
border=(0, 0),
size=(width, height))
block_params = []
for view in inner_views:
bparams = dict(view=view,
response=response,
selector=selector,
model=model, )
block_params.append(bparams)
manager = Manager()
ssr_parts = manager.list()
start_method = params.get('start_method', None)
# get number of workers
nbr_workers = get_nbr_workers(number=params.pop('nbrcpu', None))
if verbose:
print(f"using {nbr_workers=}")
with get_or_set_context(start_method).Pool(nbr_workers) as pool:
# start the block calculation processing
all_jobs = []
for pparams in block_params:
all_jobs.append(pool.apply_async(lph._block_ssr,
(pparams, ssr_parts)))
# collect results
job_timers = []
for job in all_jobs:
job_timers.append(job.get())
pool.close()
pool.join()
# Aggregate results
total_ssr = sum(res[0] for res in ssr_parts)
total_n = sum(res[1] for res in ssr_parts)
rmse = np.sqrt(total_ssr / total_n)
return rmse
[docs]
def calculate_r2(response: str | Band,
model: str | Band,
selector: NDArray[np.bool_],
block_size: Union[dict[str, tuple[int, int]], tuple[int, int]],
verbose: bool = False,
**params) -> float:
"""
Compute the Coefficient of Determination (:math:`R^2`) for a predicted model and observed response data.
The coefficient of determination, :math:`R^2`, quantifies the proportion of
variance in the response variable that is predictable from the model:
.. math::
R^2 = 1 - \\frac{SS_{\\text{res}}}{SS_{\\text{tot}}}
where the sum of squared residuals and total sum of squares are defined as:
.. math::
SS_{\\text{res}} = \\sum_{i=1}^{n}(y_i - \\hat{y}_i)^2, \\qquad
SS_{\\text{tot}} = \\sum_{i=1}^{n}(y_i - \\bar{y})^2
with :math:`y_i` the observed values, :math:`\\hat{y}_i` the model-predicted
values, and :math:`\\bar{y}` the mean of the observed values.
The function processes data in blocks for memory efficiency and parallelization.
Parameters
----------
response : Band or str
A `Band` object or a path to a raster/tif file containing the observed response data.
model : Band or str
A `Band` object or a path to a raster/tif file containing predicted values from the model.
selector : NDArray
Boolean mask (:class:`numpy.bool_`) to specify which data points should be included in the R² calculation.
Points where `selector` is False are ignored. The user must prepare the mask appropriately.
block_size : tuple[int, int] or dict[str, tuple[int, int]]
Size of the block (width, height) in pixels for processing data chunks.
If a dictionary, it should contain named blocks.
verbose : bool, default=False
If True, prints status information during computation.
**params : dict
Additional parameters for parallel processing:
- `nbr_cpus` (int): Number of CPUs to use (default: all available minus one).
- `start_method` (str): Multiprocessing start method ('spawn', 'fork', or 'forkserver').
Returns
-------
float
The :math:`R^2` coefficient ranging from -∞ to 1, where 1 indicates perfect prediction.
Computed via :func:`~coonfit.parallel_helpers._block_ssr` and
:func:`~coonfit.parallel_helpers._block_sst`.
See Also
--------
:func:`calculate_rmse` : Compute the Root Mean Square Error.
:func:`compute_weights` : Fit the regression weights used to generate the model.
Examples
--------
>>> response_band = Band(source=Source(path="response.tif"), bidx=1)
>>> model_band = Band(source=Source(path="model.tif"), bidx=1)
>>> selector_mask = np.ones((height, width), dtype=bool)
>>> r2 = calculate_r2(response_band, model_band, selector_mask, block_size=(512, 512))
>>> print(f"R² = {r2:.3f}")
"""
if not isinstance(response, Band):
response = Band(source=Source(path=response), bidx=1)
if not isinstance(model, Band):
model = Band(source=Source(path=model), bidx=1)
# Check compatibility
response.source.check_compatibility(model.source)
src_profile = response.source.import_profile()
width = int(src_profile.get('width'))
height = int(src_profile.get('height'))
# Parameter for the individual jobs
_, inner_views = create_views(view_size=block_size,
border=(0, 0),
size=(width, height))
# Calculate overall mean of response (needed for SST)
total_sum = 0
total_n = 0
for view in inner_views:
window = view_to_window(view)
response_data = response.get_data(window=window)
_selector = selector[window.toslices()]
response_data[~_selector] = np.nan
total_sum += np.nansum(response_data)
total_n += np.count_nonzero(~np.isnan(response_data))
y_mean = total_sum / total_n
# Block parameters
block_params = []
for view in inner_views:
bparams = dict(view=view,
response=response,
model=model,
selector=selector,
y_mean=y_mean)
block_params.append(bparams)
manager = Manager()
ssr_parts = manager.list()
sst_parts = manager.list()
start_method = params.get('start_method', None)
# get number of workers
nbr_workers = get_nbr_workers(number=params.pop('nbrcpu', None))
if verbose:
print(f"using {nbr_workers=}")
with get_or_set_context(start_method).Pool(nbr_workers) as pool:
# start the block calculation processing
all_jobs = []
for pparams in block_params:
all_jobs.append(pool.apply_async(lph._block_ssr,
(pparams, ssr_parts)))
all_jobs.append(pool.apply_async(lph._block_sst,
(pparams, sst_parts)))
# collect results
job_timers = []
for job in all_jobs:
job_timers.append(job.get())
pool.close()
pool.join()
# Aggregate results
total_ssr = sum(res[0] for res in ssr_parts)
total_sst = sum(res[0] for res in sst_parts)
r2 = 1 - (total_ssr / total_sst)
return r2