coonfit.parallel ================ .. py:module:: coonfit.parallel .. autoapi-nested-parse:: High-level parallelized workflows for spatial multiple linear regression. This module exposes the main user-facing functions of the ``coonfit`` package. All computations are distributed across worker processes and operate on raster data block by block, making them suitable for large spatial datasets that do not fit in memory. Key functions: - :func:`compute_weights` — Full end-to-end workflow: builds the selector mask, validates predictors, computes :math:`X^T X`, checks for linear dependencies, inverts the matrix, and returns the optimal regression weights. - :func:`compute_model` — Applies fitted weights to predictor rasters and writes the model prediction to a new GeoTIFF. - :func:`get_XT_X` — Parallelized computation of the transposed product :math:`X^T X` across spatial blocks. - :func:`get_optimal_betas` — Parallelized computation of regression coefficients given a pre-inverted :math:`(X^T X)^{-1}`. - :func:`get_XT_X_dependency` — Checks for linear dependencies among predictors without running the full fitting pipeline. - :func:`calculate_rmse` — Computes Root Mean Square Error (RMSE) between the model and observed response. - :func:`calculate_r2` — Computes the coefficient of determination (:math:`R^2`). Functions --------- .. autoapisummary:: coonfit.parallel.calculate_r2 coonfit.parallel.calculate_rmse coonfit.parallel.compute_model coonfit.parallel.compute_weights coonfit.parallel.get_XT_X coonfit.parallel.get_XT_X_dependency coonfit.parallel.get_optimal_betas Module Contents --------------- .. py:function:: calculate_r2(response, model, selector, block_size, verbose = False, **params) Compute the Coefficient of Determination (:math:`R^2`) for a predicted model and observed response data. The coefficient of determination, :math:`R^2`, quantifies the proportion of variance in the response variable that is predictable from the model: .. math:: R^2 = 1 - \frac{SS_{\text{res}}}{SS_{\text{tot}}} where the sum of squared residuals and total sum of squares are defined as: .. math:: SS_{\text{res}} = \sum_{i=1}^{n}(y_i - \hat{y}_i)^2, \qquad SS_{\text{tot}} = \sum_{i=1}^{n}(y_i - \bar{y})^2 with :math:`y_i` the observed values, :math:`\hat{y}_i` the model-predicted values, and :math:`\bar{y}` the mean of the observed values. The function processes data in blocks for memory efficiency and parallelization. :param response: A `Band` object or a path to a raster/tif file containing the observed response data. :type response: Band or str :param model: A `Band` object or a path to a raster/tif file containing predicted values from the model. :type model: Band or str :param selector: Boolean mask (:class:`numpy.bool_`) to specify which data points should be included in the R² calculation. Points where `selector` is False are ignored. The user must prepare the mask appropriately. :type selector: NDArray :param block_size: Size of the block (width, height) in pixels for processing data chunks. If a dictionary, it should contain named blocks. :type block_size: tuple[int, int] or dict[str, tuple[int, int]] :param verbose: If True, prints status information during computation. :type verbose: bool, default=False :param \*\*params: Additional parameters for parallel processing: - `nbr_cpus` (int): Number of CPUs to use (default: all available minus one). - `start_method` (str): Multiprocessing start method ('spawn', 'fork', or 'forkserver'). :type \*\*params: dict :returns: The :math:`R^2` coefficient ranging from -∞ to 1, where 1 indicates perfect prediction. Computed via :func:`~coonfit.parallel_helpers._block_ssr` and :func:`~coonfit.parallel_helpers._block_sst`. :rtype: float .. seealso:: :func:`calculate_rmse` Compute the Root Mean Square Error. :func:`compute_weights` Fit the regression weights used to generate the model. .. rubric:: Examples >>> response_band = Band(source=Source(path="response.tif"), bidx=1) >>> model_band = Band(source=Source(path="model.tif"), bidx=1) >>> selector_mask = np.ones((height, width), dtype=bool) >>> r2 = calculate_r2(response_band, model_band, selector_mask, block_size=(512, 512)) >>> print(f"R² = {r2:.3f}") .. py:function:: calculate_rmse(response, model, selector, block_size, verbose = False, **params) Compute the Root Mean Square Error (RMSE) for a predicted model and observed response data. RMSE measures the average magnitude of the residuals between predicted and observed values and is defined as: .. math:: \text{RMSE} = \sqrt{\frac{\sum_{i=1}^{n}(\hat{y}_i - y_i)^2}{n}} where :math:`\hat{y}_i` are model-predicted values, :math:`y_i` are observed response values, and :math:`n` is the number of valid observations. The function processes data in blocks for memory efficiency and parallelization. :param response: A `Band` object or a path to a raster/tif file containing the observed response data. :type response: Band or str :param model: A `Band` object or a path to a raster/tif file containing predicted values from the model. :type model: Band or str :param selector: Boolean mask (:class:`numpy.bool_`) specifying which data points should be included in the RMSE calculation. Points where `selector` is False are ignored. :type selector: NDArray :param block_size: Size of the block (width, height) in pixels for processing data chunks. If a dictionary, it should contain named blocks. :type block_size: tuple[int, int] or dict[str, tuple[int, int]] :param verbose: If True, prints status information during computation. :type verbose: bool, default=False :param \*\*params: Additional parameters for parallel processing: - `nbr_cpus` (int): Number of CPUs to use (default: all available minus one). - `start_method` (str): Multiprocessing start method ('spawn', 'fork', or 'forkserver'). :type \*\*params: dict :returns: The RMSE value. Lower values indicate better model fit. Computed via :func:`~coonfit.parallel_helpers._block_ssr`. :rtype: float .. seealso:: :func:`calculate_r2` Compute the coefficient of determination (R²). :func:`compute_weights` Fit the regression weights used to generate the model. .. rubric:: Examples >>> response_band = Band(source=Source(path="response.tif"), bidx=1) >>> model_band = Band(source=Source(path="model.tif"), bidx=1) >>> selector_mask = np.ones((height, width), dtype=bool) >>> rmse = calculate_rmse(response_band, model_band, selector_mask, block_size=(512, 512)) >>> print(f"RMSE = {rmse:.3f}") .. py:function:: compute_model(predictors, optimal_weights, output_file, block_size, predictors_as_dtype = None, profile = None, selector = None, selector_band = None, verbose = False, **params) Create a ``.tif`` file with the model prediction values from a fitted model. :param predictors: Collection of predictor bands used in the multiple linear regression. :type predictors: Collection of Band :param optimal_weights: Holding for each predictor the optimal weight. If weights include a key named "intercept", this will be used as the intercept (beta0) for the model prediction. If a `selector_band` is provided, then it must hold for each categorical value (key) a dictionary with the optimal weights per predictor. See the `selector_band` parameter for more details. :type optimal_weights: dict of dict, dict, or None :param output_file: Path to where the model result should be written to. :type output_file: str :param block_size: Size (width, height) in pixels of the block that a single job processes. :type block_size: tuple of int :param predictors_as_dtype: Datatype to convert predictor input to (e.g. np.float32) prior to computing their contribution. .. note:: Only a type conversion is supported prior to computing the predictors contribution. Rescaling of a predictor needs to happen in a separate step beforehand. :type predictors_as_dtype: str, type, or None, optional :param profile: The profile to use for the newly created output tif. By default the profile is copied from the first source of the predictor bands, updating the count to 1. :type profile: dict or None, optional :param selector: A selector array to use to selectively calculate the model prediction. If a boolean array is provided then it is applied as an (inverted) mask: only pixels that result to `True` are calculated. If a categorical array (np.uint8) is provided instead, then it is assumed that the `optimal_weights` use it as a selector for (masking) the processing of the model. :type selector: NDArray of bool or None, optional :param selector_band: A band object with categorical data. If provided, the `optimal_weights` needs to hold for each of the category a dictionary with predictor specific weights. :type selector_band: Band or None, optional :param verbose: Print out processing steps. Default is False. :type verbose: bool, optional :param \*\*params: Optional arguments for the multiprocessing: - nbrcpu : int, optional Number of CPUs to use for parallel processing. - start_method : str, optional Multiprocessing start method ('fork', 'spawn', or 'forkserver'). - compress : bool, optional If True, apply LZW compression to the final output file. :type \*\*params: dict :returns: **output_file** -- Path to the newly created tif file holding the model prediction data. If compression is enabled, this will be the path to the compressed file. :rtype: str .. rubric:: Notes The function uses multiprocessing to process the image in blocks for improved performance. Each block is processed independently by :func:`~coonfit.parallel_helpers._block_model_prediction` and results are aggregated by :func:`~riogrande.parallel.combine_views` into the final output file. Block decomposition uses :func:`~riogrande.prepare.create_views`. Worker pool is sized via :func:`~riogrande.helper.get_nbr_workers` and created with :func:`~riogrande.helper.get_or_set_context`. Timing information for each job and the total duration is printed to standard output upon completion. .. seealso:: :func:`compute_weights` Compute the optimal weights used as input here. :func:`calculate_rmse` Evaluate the model quality after calling this function. :func:`calculate_r2` Compute R² for the model output. .. py:function:: compute_weights(response, predictors, block_size, include_intercept = True, as_dtype=np.float64, limit_contribution = 0.0, no_data = 0.0, sanitize_predictors = False, return_linear_dependent_predictors = False, verbose = False, **params) Compute optimal regression coefficients for multiple linear regression. This function fits a multiple linear regression model by computing the optimal weights (beta coefficients) for each predictor. The computation involves creating a selector mask, validating predictor consistency, calculating the :math:`X^T X` matrix, checking for linear dependencies, and solving the normal equations. :param response: Path to a tif file or Band object containing the response data. Used to determine spatial dimensions and create the selector mask. :type response: str or Band :param predictors: Collection of predictor bands to use in the regression. See :func:`~coonfit.inference.prepare_predictors` for details on predictor specification. :type predictors: Collection of Band :param block_size: Block sizes for parallel processing functions. - If tuple: (width, height) in pixels, applied to all processing steps. - If dict: Maps function names to specific block sizes with keys: 'prepare_selector', 'get_XT_X', 'get_optimal_betas'. Each value should be a tuple (width, height). :type block_size: tuple of int or dict of {str: tuple of int} :param include_intercept: If True, fit an intercept term in the regression model. Default is True. :type include_intercept: bool, optional :param as_dtype: Data type for internal computations and output weights. Default is np.float64. :type as_dtype: dtype, optional :param limit_contribution: Minimum fraction of valid cells each predictor must contribute to be considered valid. Value between 0.0 and 1.0. Default is 0.0 (a single valid value is sufficient). :type limit_contribution: float, optional :param no_data: Value representing invalid or missing data. Cells with this value are excluded from analysis. Default is 0.0. :type no_data: int or float, optional :param sanitize_predictors: If True, automatically remove predictors that fail the contribution threshold. If False, raise an exception when invalid predictors are found. Default is False. :type sanitize_predictors: bool, optional :param return_linear_dependent_predictors: If True and linear dependencies are detected, return a dictionary mapping linearly dependent predictors to their dependency type instead of computing weights. This stops the fitting process. If False and dependencies are detected, raise an error. Default is False. :type return_linear_dependent_predictors: bool, optional :param verbose: If True, print processing step information. Default is False. :type verbose: bool, optional :param \*\*params: Optional arguments: - extra_masking_band : Band or None, optional Additional Band object to use directly as a mask. All cells with value 0 are masked. - nbrcpu : int, optional Number of CPUs to use. If not set, uses (available CPUs - 1). - start_method : str, optional Process start method: 'spawn', 'fork', or 'forkserver'. :type \*\*params: dict :returns: * **optimal_weights** (*dict of {Band or str: float} or None*) -- Dictionary mapping each predictor (and 'intercept' if `include_intercept=True`) to its optimal regression coefficient. Returns None if the selector masks all pixels or if linear dependencies are detected and `return_linear_dependent_predictors=False`. * **linear_dependencies** (*dict of {Band: str}*) -- Only returned if `return_linear_dependent_predictors=True` and linear dependencies are detected. Maps each linearly dependent predictor to a description of its dependency type. :raises ValueError: If `block_size` is a dictionary but doesn't contain the required keys or has invalid value types. :raises ~coonfit.exceptions.InvalidPredictorError: If `sanitize_predictors=False` and predictors fail the contribution threshold. :raises numpy.linalg.LinAlgError: If the X.T @ X matrix is singular and cannot be inverted (when `return_linear_dependent_predictors=False`). .. warning:: UserWarning Issued when the selector masks all pixels (no data to fit). UserWarning Issued when linear dependencies are detected. .. rubric:: Notes The function performs the following workflow: 1. Creates a selector mask via :func:`~riogrande.parallel.prepare_selector` 2. Validates predictor consistency via :func:`~coonfit.parallel_helpers._check_predictor_consistency` (removes invalid predictors if `sanitize_predictors=True`) 3. Recalculates selector if predictors were removed 4. Computes :math:`X^T X` matrix in parallel via :func:`get_XT_X` 5. Checks for rank deficiency via :func:`~coonfit.helper.check_rank_deficiency` 6. Inverts :math:`X^T X` via :func:`numpy.linalg.inv` to obtain :math:`(X^T X)^{-1}` 7. Computes optimal weights via :func:`get_optimal_betas` The optimal weights solve the ordinary least squares problem: .. math:: \beta = (X^T X)^{-1} X^T y where :math:`X` is the design matrix of predictors and :math:`y` is the response vector. Linear dependency detection excludes the intercept column from the rank check, as the intercept is always included as the last column when `include_intercept=True`. If predictors are removed during sanitization, the selector is recomputed to ensure consistency, as removed predictors may have contributed to masking certain pixels. .. seealso:: :func:`compute_model` Apply the fitted weights to produce a model raster. :func:`get_XT_X` Compute ``X.T @ X`` used in the normal equations. :func:`get_optimal_betas` Compute regression coefficients given ``Y = (X.T @ X)^{-1}``. :func:`calculate_rmse` Evaluate model fit using RMSE. :func:`get_XT_X_dependency` Check for linear dependencies without full fitting. .. rubric:: Examples >>> # Basic usage with uniform block size >>> predictors = [band1, band2, band3] >>> weights = compute_weights( ... response='response.tif', ... predictors=predictors, ... block_size=(512, 512), ... include_intercept=True, ... verbose=True ... ) >>> weights {: 0.523, : 1.245, : -0.334, 'intercept': 5.12} >>> # Use different block sizes for different steps >>> block_sizes = { ... 'prepare_selector': (1024, 1024), ... 'get_XT_X': (512, 512), ... 'get_optimal_betas': (256, 256) ... } >>> weights = compute_weights( ... response='response.tif', ... predictors=predictors, ... block_size=block_sizes, ... limit_contribution=0.05, ... sanitize_predictors=True, ... nbrcpu=4 ... ) >>> # Detect and return linear dependencies >>> deps = compute_weights( ... response='response.tif', ... predictors=predictors, ... block_size=(512, 512), ... return_linear_dependent_predictors=True ... ) >>> if isinstance(deps, dict) and all(isinstance(v, str) for v in deps.values()): ... print(f"Linear dependencies found: {deps}") ... else: ... print(f"Optimal weights: {deps}") .. py:function:: get_XT_X(response, *predictors, selector, include_intercept = True, verbose = False, **mpc_params) Calculate :math:`X^T X` matrix in parallel from predictor data blocks. This function computes the transpose-product matrix :math:`X^T X` used in linear regression by processing the predictor data in parallel blocks. The response parameter is only used to determine the spatial dimensions of the computation. :param response: Path to a tif file or Band object containing the response data. Only the spatial dimensions (width, height) are used; the actual response values are not required for this computation. :type response: str or Band :param \*predictors: Variable number of predictor bands to include in the design matrix X. Can be Band objects or paths to source files. See :func:`~coonfit.inference.prepare_predictors` for details on predictor specification. :type \*predictors: Band or str :param selector: Boolean array (:class:`numpy.bool_`) to select usable cells. Must have the same spatial dimensions as the response. True values indicate pixels to include in the computation. :type selector: NDArray :param include_intercept: If True, append a column of ones to the design matrix X to fit an intercept term. Default is True. :type include_intercept: bool, optional :param verbose: If True, print runtime information including number of workers used. Default is False. :type verbose: bool, optional :param \*\*mpc_params: Multiprocessing configuration parameters. Required: - view_size : tuple of int Size (width, height) in pixels of a single view/block to process. Optional: - nbrcpu : int, optional Number of CPUs to use. If not set, uses (available threads - 1). - start_method : str, optional Starting method for multiprocessing ('fork', 'spawn', or 'forkserver'). :type \*\*mpc_params: dict :returns: **XT_X** -- The transpose-product matrix :math:`X^T X` of shape (n_predictors, n_predictors). If `include_intercept=True`, the shape is (n_predictors+1, n_predictors+1) with the intercept column included as the last row and column. :rtype: NDArray .. rubric:: Notes The function implements parallel computation by: 1. Dividing the spatial domain into non-overlapping blocks via :func:`~riogrande.prepare.create_views` 2. Computing partial :math:`X^T X` matrices for each block in parallel via :func:`~coonfit.parallel_helpers._partial_transposed_product` 3. Aggregating the partial results via :func:`~coonfit.parallel_helpers._combine_matrices` This approach is memory-efficient for large spatial datasets as it avoids loading the entire design matrix into memory at once. The selector mask is applied consistently across all blocks to ensure only valid pixels contribute to the computation. .. seealso:: :func:`get_optimal_betas` Compute beta coefficients from ``X.T @ X``. :func:`compute_weights` Full workflow wrapping this function. .. py:function:: get_XT_X_dependency(response, predictors, block_size, include_intercept = True, limit_contribution = 0.0, no_data = 0.0, sanitize_predictors = False, verbose = False, **params) Test predictors for linear dependency before fitting multiple linear regression. This function checks whether predictor columns are linearly dependent by computing the :math:`X^T X` matrix and analyzing its rank. Linear dependencies can cause numerical instability or singularity in regression fitting and should be resolved before proceeding with model estimation. :param response: Path to a tif file or Band object containing the response data. Used to determine spatial dimensions and create the selector mask. :type response: str or Band :param predictors: Collection of predictor bands to test for linear dependency. See :func:`~coonfit.inference.prepare_predictors` for details on predictor specification. :type predictors: Collection of Band :param block_size: Block sizes for parallel processing functions. - If tuple: (width, height) in pixels, applied to all processing steps. - If dict: Maps function names to specific block sizes with keys: 'prepare_selector', 'get_XT_X'. Each value should be a tuple (width, height). :type block_size: tuple of int or dict of {str: tuple of int} :param include_intercept: If True, include an intercept term when computing ``X.T @ X``. Default is True. :type include_intercept: bool, optional :param limit_contribution: Minimum fraction of valid cells each predictor must contribute to be considered valid. Value between 0.0 and 1.0. Default is 0.0 (a single valid value is sufficient). :type limit_contribution: float, optional :param no_data: Value representing invalid or missing data. Cells with this value are excluded from analysis. Default is 0.0. :type no_data: int or float, optional :param sanitize_predictors: If True, automatically remove predictors that fail the contribution threshold. If False, raise an exception when invalid predictors are found. Default is False. :type sanitize_predictors: bool, optional :param verbose: If True, print processing step information. Default is False. :type verbose: bool, optional :param \*\*params: Optional multiprocessing arguments: - nbrcpu : int, optional Number of CPUs to use. If not set, uses (available CPUs - 1). - start_method : str, optional Process start method: 'spawn', 'fork', or 'forkserver'. :type \*\*params: dict :returns: **dependencies** -- Dictionary mapping each rank-deficient predictor to a description of its linear dependency. Returns None if the selector masks all pixels (no data to fit). If no linear dependencies are found, returns an empty dictionary. Rank deficiency is checked using :func:`~coonfit.helper.check_rank_deficiency`. :rtype: dict of {Band: str} or None .. seealso:: :func:`compute_weights` Full fitting workflow using this dependency check. :func:`get_XT_X` Compute the ``X.T @ X`` matrix analyzed here. .. py:function:: get_optimal_betas(*predictors, Y, response, selector, include_intercept=True, verbose = False, as_dtype=np.float64, **mpc_params) Calculate optimal regression coefficients (:math:`\beta`) in parallel from spatial data. This function computes the optimal weights ((:math:`\beta`) coefficients) for a multiple linear regression by processing predictor data in parallel blocks. The computation solves for beta in the normal equation (ordinary least squares problem): .. math:: \beta = (X^T X)^{-1} X^T y where :math:`X` is the design matrix of predictors and :math:`y` is the response vector. This approach is memory-efficient for large spatial datasets as it avoids loading the entire design matrix into memory at once. :param \*predictors: Variable number of predictor bands to include in the regression. Can be Band objects or paths to source files. See :func:`~coonfit.inference.prepare_predictors` for details on predictor specification. :type \*predictors: Band or str :param Y: Pre-inverted transpose-product matrix, :math:`(X^T X)^{-1}`, of shape (n_predictors, n_predictors). Typically obtained via ``numpy.linalg.inv(get_XT_X(...))``. Shape is (n_predictors+1, n_predictors+1) if `include_intercept=True`. :type Y: NDArray :param response: Path to a tif file or Band object containing the response data. Used to determine spatial dimensions for block processing. :type response: str or Band :param selector: Boolean array (:class:`numpy.bool_`) to select usable cells. Must have the same spatial dimensions as the response. True values indicate pixels to include in the regression. :type selector: NDArray :param include_intercept: If True, fit an intercept term by appending a column of ones to the design matrix X. The intercept will be included in the returned dictionary with key 'intercept'. Default is True. :type include_intercept: bool, optional :param verbose: If True, print runtime information including number of workers, predictors, and computed beta values. Default is False. :type verbose: bool, optional :param as_dtype: Data type to use for internal computations. Default is np.float64. :type as_dtype: dtype, optional :param \*\*mpc_params: Multiprocessing configuration parameters. Required: - view_size : tuple of int Size (width, height) in pixels of a single view/block to process. Optional: - nbrcpu : int, optional Number of CPUs to use. If not set, uses (available threads - 1). - start_method : str, optional Starting method for multiprocessing ('fork', 'spawn', or 'forkserver'). :type \*\*mpc_params: dict :returns: **optimal_weights** -- Dictionary mapping each predictor to its optimal regression coefficient. If `include_intercept=True`, includes an additional entry with key 'intercept' for the intercept term (beta_0). :rtype: dict of {Band or str: float} :raises ValueError: If the number of computed beta values does not match the number of predictors (plus intercept if applicable). .. rubric:: Notes The function implements parallel computation by: 1. Dividing the spatial domain into non-overlapping blocks via :func:`~riogrande.prepare.create_views` 2. Computing partial contributions for each block via :func:`~coonfit.parallel_helpers._partial_optimal_betas` 3. Aggregating the partial results via :func:`~coonfit.parallel_helpers._combine_matrices` .. seealso:: :func:`get_XT_X` Compute ``X.T @ X``, whose inverse is passed as ``Y``. :func:`compute_weights` Full workflow wrapping this function. .. rubric:: Examples >>> # Compute optimal betas with intercept >>> selector = np.ones((1000, 1000), dtype=bool) >>> Y = np.linalg.inv(XT_X) # Pre-inverted (X.T @ X)^{-1} >>> weights = get_optimal_betas( ... band1, band2, band3, ... Y=Y, ... response='response.tif', ... selector=selector, ... include_intercept=True, ... view_size=(512, 512), ... nbrcpu=4 ... ) >>> weights {: 0.523, : 1.245, : -0.334, 'intercept': 5.12} >>> # Without intercept >>> Y = np.array([10.5, 20.3, 15.7]) >>> weights = get_optimal_betas( ... band1, band2, band3, ... Y=Y, ... response='response.tif', ... selector=selector, ... include_intercept=False, ... view_size=(512, 512) ... ) >>> weights {: 0.523, : 1.245, : -0.334}