"""
Low-level I/O functions for reading and writing GeoTIFF raster data.
This module provides the functional layer beneath :class:`~riogrande.io.models.Source`
and :class:`~riogrande.io.models.Band`. It contains functions for tag management,
band index lookup, block loading with optional rescaling, band writing and
updating, coordinate registration, and LZW compression.
Most functions in this module operate directly on open ``rasterio``
:class:`~rasterio.io.DatasetWriter` or :class:`~rasterio.io.DatasetReader`
objects and are called internally by the :class:`~riogrande.io.models.Source`
and :class:`~riogrande.io.models.Band` class methods.
"""
from __future__ import annotations
import os
import glob
from typing import Any
from math import floor
from numpy.typing import NDArray
import rasterio as rio
from rasterio.io import DatasetWriter
from rasterio.windows import Window
from rasterio.warp import (
calculate_default_transform,
reproject,
Resampling,
)
from ..helper import (
check_crs,
output_filename,
serialize,
deserialize,
sanitize,
match_all,
view_to_window,
)
from .exceptions import (
BandSelectionNoMatchError,
BandSelectionAmbiguousError,
)
def _set_tags(src: DatasetWriter, bidx: int | None = None, ns: str = NS, **tags: Any) -> None:
"""Update tags for a dataset or a single band of a dataset.
Since metadata in a tif file is stored as a string the value of each tag is
serialized and converted to a string with :func:`~riogrande.helper.serialize`.
A tag name must satisfy the python variable naming convention and must be different from ``src``,
``bidx`` and ``ns`` as these are reserved for the arguments of this function.
Existing tags are either kept or updated.
Parameters
----------
src : DatasetWriter
``tif`` file opened with :func:`rasterio.open` in write mode (i.e. ``"w"`` or ``"r+"``)
bidx : int or None
Index of the band to set tags for (starting from 1 as is the convention
in rasterio). If set to ``None`` then the tags are set for the entire
dataset.
ns : str
The namespace to set the tags in.
.. note::
It is dicouraged to change this value from the default as all tagging
related methods of this package use the same default namespace.
**tags : Any
Arbitrary number of keyword arguments that will be set as tags.
The value provided is converted to a string with :func:`~riogrande.helper.serialize`
before the tag is written to the file.
See Also
--------
:func:`~riogrande.io.core._get_tags` : Read and deserialize tags from a dataset or band.
Examples
----------
Setting the tags
- 'category': 1
- foo: 'bar'
on band with index 2 in some opened tif file (`src`) is done with:
```python
set_tags(src=src, bidx=2, category=1, foo='bar')
```
"""
if bidx is None:
bidx = 0
serialized_tags = serialize(tags)
src.update_tags(ns=ns, bidx=bidx, **serialized_tags)
def _get_tags(src: DatasetWriter, bidx: int | None = None, ns: str = NS) -> dict[str, Any]:
"""Get all the tags and deserialize the values
Reads raw tags from the dataset and deserializes them with
:func:`~riogrande.helper.deserialize`.
Parameters
----------
src : DatasetWriter
``tif`` file opened with :func:`rasterio.open`
bidx : int or None
Index of the band to get tags from (starting from 1 as is the convention
in rasterio). If set to ``None`` then the tags for the entire dataset are
returned.
ns : str
The namespace to get the tags from.
.. note::
It is dicouraged to change this value from the default as all tagging
related methods of this package use the same default namespace.
Returns
----------
dict
Tags from queried band are returned in a dictionary form.
See Also
--------
:func:`~riogrande.io.core._set_tags` : Write and serialize tags to a dataset or band.
"""
if bidx is None:
bidx = 0 # get the tags for the files metadata
return deserialize(src.tags(bidx=bidx, ns=ns))
def _find_bidxs(src: DatasetWriter, ns: str = NS, **tags: Any) -> list[int]:
"""Find all bands in src for which all tags match
Iterates over all bands and reads their tags via :func:`~riogrande.io.core._get_tags`,
then checks for full tag agreement with :func:`~riogrande.helper.match_all`.
Parameters
----------
src : DatasetWriter
``tif`` file opened with :func:`rasterio.open`
ns : str
The namespace to set the tags in.
.. note::
It is dicouraged to change this value from the default as all tagging
related methods of this package use the same default namespace.
**tags : Any
Arbitrary number of keyword arguments that will be compared to the tags
of the bands in the dataset.
Returns
----------
list[int]
List of all indexes (integer) for bands where tags match.
See Also
--------
:func:`~riogrande.io.core._get_bidx_by_tag` : Return a single matching band index.
:func:`~riogrande.io.core.get_bands_by_tag` : Search across multiple source files.
"""
_tags = sanitize(tags)
matching_bidxs = []
for bidx in src.indexes:
b_tags = _get_tags(src=src, bidx=bidx, ns=ns)
if match_all(targets=_tags, tags=b_tags):
matching_bidxs.append(bidx)
return matching_bidxs
def _get_bidx_by_tag(src: DatasetWriter, ns: str = NS, **tags: Any) -> None | int:
"""Get the index of the band with matching tags
You can specify an arbitrary number of tags by passing keyword arguments
to this selector. Make sure that the provided tags identify one and only one specific band,
as only a single band index is returned. If no band with matching tags is found,
or if multiple matching bands are found a `BandSelectionNoMatchError` is raised.
To return potentially multiple bands matching, use the `get_bands` method instead.
If no tags are provided then the index of the first band is returned.
Parameters
----------
src : DatasetWriter
``tif`` file opened with :func:`rasterio.open`
ns : str
The namespace to set the tags in.
It is dicouraged to change this value from the default as all tagging
related methods of this package use the same default namespace.
**tags : Any
Arbitrary number of keyword arguments that will be compared to the tags
of the bands in the dataset. If ``indexes`` is provided as tag key then all other tags are ignored and
the indexes are directly passed as band indexes to rasterio.
Returns
----------
int or None
Band index (integer) of band matching provided tags. If no match was found None is returned.
Notes
----------
The values of the provided tags are first serialized and then
deserialized again with :func:`~riogrande.helper.serialize` /
:func:`~riogrande.helper.deserialize` (i.e. via :func:`~riogrande.helper.sanitize`)
before comparing to the tags from the provided file.
The reason for this procedure is the fact that the values of tags are
converted to and stored as strings in the tif metadata.
Serializing the values with :func:`~riogrande.helper.serialize` allows us to know how
arbitrary python objects are converted.
As a consequence, we serialize/deserialize the values of the provided
tags to bring them into the form they will we when loading them from
the tif.
Raises
------
:exc:`~riogrande.io.exceptions.BandSelectionAmbiguousError`
If the tags match more than one band.
:exc:`~riogrande.io.exceptions.BandSelectionNoMatchError`
If no band matches the provided tags.
See Also
--------
:func:`~riogrande.io.core._find_bidxs` : Return all matching band indexes.
:func:`~riogrande.io.core.get_bands_by_tag` : Search across multiple source files.
Examples
----------
Get the band with the tags
- 'category': 1
- foo: 'bar'
>>> bidx = _get_bidx_by_tag(src=src, foo='bar', category=1)
"""
if 'indexes' in tags or not tags:
bidx = tags.get('indexes', 1) # return 1 if nothing is provided
else:
_tags = sanitize(tags)
matching_bidxs = _find_bidxs(src=src, ns=ns, **_tags)
matches = len(matching_bidxs)
if matches > 1:
raise BandSelectionAmbiguousError(
f"The selection\n\t{_tags}\nleads to multiple matches:\n\t{matching_bidxs}"
)
elif matches == 0:
raise BandSelectionNoMatchError(
f"No band matches the tags: {_tags}"
)
bidx = matching_bidxs[0]
return bidx
[docs]
def get_bands_by_tag(source: str, ns: str = NS, **tags: Any) -> list[tuple[str, int]]:
"""Find all bands that match specific tags
This method check the metadata (including those of bands)
in one or several tif files and returns the file paths, as well as,
the band indexes for all bands with matching tags.
Whenever a band has tags that match, the name of the tif file,
as well as, the band index are added to the list of returned
matches.
If the tags are found in the metadata of a dataset the path
to the file and a band index of None are added to the list of
returned matches (this different form the rasterio convention to
that uses bidx=0 for "all bands" - I find that confusing).
Parameters
----------
source : str
A glob pattern string passed to :func:`glob.glob`, leading to (potentially)
multiple source files that will be checked.
ns : str
The namespace to search the tags in. It is dicouraged to change this value from the default as all tagging
related methods of this package use the same default namespace.
**tags : Any
Arbitrary number of keyword arguments that will be compared to the tags
of each tif file.
Returns
----------
list
A list of tuples with source (path) and bandindex entries in tuples.
See Also
--------
:func:`~riogrande.io.core._get_bidx_by_tag` : Find a single band in one open dataset.
:func:`~riogrande.io.core._find_bidxs` : Return all matching band indexes in one dataset.
"""
_tags = sanitize(tags)
_sources = glob.glob(source)
matches = []
for source in _sources:
with rio.open(source, "r") as src:
ds_tags = _get_tags(src=src, bidx=None, ns=ns)
bidxs = _find_bidxs(src=src, ns=ns, **_tags)
if match_all(targets=_tags, tags=ds_tags):
bidxs.append(None) # use bidx None to indicate tags of the file
for bidx in bidxs:
matches.append((source, bidx))
return matches
[docs]
def load_block(source: str, view: None | tuple[int, int, int, int] = None, scaling_params: dict | None = None,
**tags: Any) -> dict[str, Any]:
"""Get a block from a specific band of a ``.tif`` file along with the transform
You can select what band(s) to load by passing keyword arguments as tags
(see `**tags` below) and limit the area to load by passing a view
(converted to a :class:`rasterio.windows.Window` via
:func:`~riogrande.helper.view_to_window`).
Parameters
----------
source : str
The path to the tif file to load
view : tuple[int, int, int, int] or None
An optional tuple (x, y, width, height) defining the area to load.
If ``None`` is provided (the default) then the entire file is loaded.
scaling_params : dict or None
Optional dictionary to set a rescaling of the data.
If provided, the following keywords are accepted:
scaling : tuple[float, float]
Factors to rescale the number of pixels. Values >1 will upscale.
method : rasterio.enums.Resampling
The resampling method. If not provided then
:attr:`rasterio.enums.Resampling.bilinear` is used.
**tags : Any
Arbitrary number of keyword arguments to describe the band to select.
See :func:`~riogrande.io.core._get_bidx_by_tag` for further details.
Returns
-------
dict
data: holding a numpy array with the actual data
transform: an :class:`affine.Affine` object that encodes the transformation used
orig_profile: The profile information of the original .tif file
See Also
--------
:func:`~riogrande.io.core.write_band` : Write data to a specific band.
:func:`~riogrande.helper.view_to_window` : Convert a view tuple to a rasterio Window.
"""
window = view_to_window(view)
with rio.open(source) as img:
bidx = _get_bidx_by_tag(src=img, **tags)
if window is not None:
transform = img.window_transform(window)
width = window.width
height = window.height
else:
transform = img.transform
width = img.width
height = img.height
# perform a re-scaling if needed
if scaling_params:
scaling = scaling_params.get('scaling')
resampling = scaling_params.get('method', Resampling.bilinear)
out_shape = (
img.count,
floor(img.height * scaling[0]),
floor(img.width * scaling[1])
)
else:
out_shape = None
resampling = Resampling.nearest
# read out the desired part
data = img.read(indexes=bidx,
window=window,
out_shape=out_shape,
resampling=resampling)
if scaling_params:
# scale image transform
transform = transform * transform.scale(
(width / data.shape[-1]),
(height / data.shape[-2])
)
return {
'data': data,
'transform': transform,
'orig_profile': img.profile.copy()
}
[docs]
def write_band(src: DatasetWriter, data: NDArray, bidx: int = 1, window: Window | None = None,
**tags: Any) -> None:
"""Write data to a specific band of a tif file and set the tags
Parameters
----------
src : DatasetWriter
``tif`` file opened with :func:`rasterio.open`
data : NDArray
The array to write into the file
bidx : int
Band index to write into the file
window : Window or None
An optional :class:`rasterio.windows.Window` to specify an area to write.
**tags : Any
Arbitrary number of keyword arguments that will be set as tags.
The value provided is converted to a string with :func:`~riogrande.helper.serialize`
via :func:`~riogrande.io.core._set_tags` before being written to the file.
Returns
-------
None
See Also
--------
:func:`~riogrande.io.core.update_band` : Find a band by tags and update it.
:func:`~riogrande.io.core.load_block` : Load a block of data from a band.
"""
src.write(data, indexes=bidx, window=window)
_set_tags(src, bidx=bidx, **tags)
[docs]
def update_band(src: DatasetWriter, data: NDArray, window: Window | None = None, **tags: Any) -> None:
"""Find a specific band and update it with data
This function writes a data array in a band specified with tags,
identified via :func:`~riogrande.io.core._get_bidx_by_tag`.
If no band with the matching tags is found a
:exc:`~riogrande.io.exceptions.BandSelectionNoMatchError` is raised.
Parameters
----------
src : DatasetWriter
``tif`` file opened with :func:`rasterio.open`
data : NDArray
The array to write into the file
window : Window or None
An optional :class:`rasterio.windows.Window` to specify an area to write.
**tags : Any
Arbitrary number of keyword arguments that will be used to find
the band to write into.
Returns
--------
None
See Also
--------
:func:`~riogrande.io.core.write_band` : Write to a band by explicit index.
"""
try:
bidx = _get_bidx_by_tag(src=src, **tags)
except BandSelectionNoMatchError as e:
raise BandSelectionNoMatchError(
"There was no band with matching tags. "
"Either adapt the tags or use `write_band` "
"with a specific band index instead if you "
"want to write and also update the tags."
) from e
else:
src.write(data, indexes=bidx, window=window)
def _export_to_tif(destination: str, data: NDArray, orig_profile: dict, start=(0, 0), **pparams: Any) -> None:
"""Export a np.array to tif, only updating a window if data is smaller
This function will overwrite the dtype of the destination tif with the
value provided in `pparams` or the data type of `data`.
Parameters
----------
destination : str
location to export save the .tif file
data : NDArray
The map to export
start : tuple[int, int]
horizontal and vertical starting coordinate
orig_profile : dict
the profile of the original map
(see https://rasterio.readthedocs.io/en/stable/topics/profiles.html)
**pparams : Any
further parameter to be added to the profile
Returns
--------
None
"""
profile = orig_profile.copy()
# Note: we no longer update the size automatically as for Windows this is
# not correct, pass height and width explicitly to update via pparams
# # update for the correct dimensions
# profile['height'] = data.shape[1]
# profile['width'] = data.shape[0]
# set the dtype explicitly of get it from the data
profile['dtype'] = pparams.pop('dtype', str(data.dtype))
profile.update(pparams)
# write it:
size = data.shape[::-1] # since positions are inverted in numpy
with rio.open(destination, "w", **profile) as dest:
dest.write(data, window=Window(*start, *size), indexes=1)
[docs]
def coregister_raster(source: str, reference: str, output: str | None = None) -> str:
"""Align raster to have identical resolution.
Resolution will be calculated automatically from bounds and height/width of reference raster
using :func:`rasterio.warp.calculate_default_transform`.
Reprojection is performed with :func:`rasterio.warp.reproject` and
:attr:`rasterio.enums.Resampling.nearest`.
CRS compatibility is verified with :func:`~riogrande.helper.check_crs`.
Parameters
----------
source : str
The path to the tif file you want to co-register
reference : str
The path to the tif file with the pixel registration to use as reference for co-registration
output : str or None
The path to write the co-registered map to. If ``None``, a filename is generated
by :func:`~riogrande.helper.output_filename`.
Returns
-------
str:
The name of the file that holds co-registered map
"""
check_crs(source, reference)
if output is None:
output = output_filename(source, out_type="coreg")
with rio.open(source) as src:
src_transform = src.transform
src_nodata = src.nodata
with rio.open(reference) as refsrc:
dst_crs = refsrc.crs
(dst_transform,
dst_width,
dst_height) = calculate_default_transform(src.crs, dst_crs, refsrc.width, refsrc.height, *refsrc.bounds)
dst_kwargs = src.meta.copy()
dst_kwargs.update({"crs": dst_crs,
"transform": dst_transform,
"width": dst_width,
"height": dst_height,
"nodata": src_nodata})
with rio.open(output, "w", **dst_kwargs) as dst:
for bidx in src.indexes:
reproject(
source=rio.band(src, bidx),
destination=rio.band(dst, bidx),
src_transform=src_transform,
src_crs=src.crs,
dst_transform=dst_transform,
dst_crs=dst_crs,
resampling=Resampling.nearest)
return output
[docs]
def compress_tif(source, output: str | None = None, compression: str | None = 'lzw') -> str:
"""Compress tif file with LZW compression
Band tags are copied band-by-band using :func:`~riogrande.io.core._get_tags`
and :func:`~riogrande.io.core._set_tags`.
Parameters
----------
source : str
The path to the tif file you want to compress
output : str or None
Optional path to output file.
If not set, the resulting file will inherit the filename from ``source`` and get
a ``_compress`` appendix via :func:`~riogrande.helper.output_filename`.
If compression is ``'none'``, i.e. no compression the appendix will be ``'_decompressed'``.
compression : str or None
Type of compression to use, default is LZW. See GDAL documentation for details.
Returns
-------
str
The name of the compressed file
See Also
--------
:meth:`~riogrande.io.models.Source.compress` : Convenience wrapper on the :class:`~riogrande.io.models.Source` class.
"""
if compression is None:
compression = 'none'
overwrite = False
if output is None:
if compression != 'none':
output = output_filename(source, "compress")
else:
output = output_filename(source, "decompressed")
elif output == source:
overwrite = True
output = output_filename(source, 'tmp')
with rio.Env():
with rio.open(source) as src:
profile = src.profile
profile.update(compress=compression)
with rio.open(output, 'w', **profile) as dst:
_set_tags(src=dst, bidx=None, **_get_tags(src=src, bidx=None))
for i in range(1, src.count + 1):
for ji, window in src.block_windows(i):
array = src.read(i, window=window)
dst.write(array, i, window=window)
tags = _get_tags(src, bidx=i)
_set_tags(dst, bidx=i, **tags)
band_names = src.descriptions[(i - 1)]
dst.set_band_description(i, band_names)
if overwrite:
os.remove(source)
os.rename(src=output, dst=source)
output = source
return output