Source code for pandora2d.img_tools

#!/usr/bin/env python
#
# Copyright (c) 2026 Centre National d'Etudes Spatiales (CNES).
# Copyright (c) 2026 CS GROUP France
#
# This file is part of PANDORA2D
#
#     https://github.com/CNES/Pandora2D
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains functions associated to raster images.
"""

# mypy: disable-error-code="attr-defined, no-redef"
# pylint: disable="useless-import-alias,redefined-outer-name"
# xarray.Coordinates corresponds to the latest version of xarray.
# xarray.Coordinate corresponds to the version installed by the artifactory.
# Try/except block to be deleted once the version of xarray has been updated by CNES.
try:
    from xarray import Coordinates as Coordinates
except ImportError:
    from xarray import Coordinate as Coordinates

import copy
from math import floor
from typing import Any, NamedTuple

import numpy as np
import pandora.img_tools as pandora_img_tools
import xarray as xr
from numpy.typing import NDArray
from rasterio.windows import Window
from scipy.ndimage import shift, zoom

from pandora2d.types import Origin, Step
from pandora2d.common import build_usable_data_mask


[docs] class Datasets(NamedTuple): """NamedTuple to store left and right datasets."""
[docs] left: xr.Dataset
[docs] right: xr.Dataset
[docs] def create_datasets_from_inputs( input_config: dict, roi: dict = None, estimation_cfg: dict = None, attributes: dict = None ) -> Datasets: """ Read image and return the corresponding xarray.DataSet :param input_config: configuration used to create dataset. :param roi: dictionary with a roi "col": {"first": <value - int>, "last": <value - int>}, "row": {"first": <value - int>, "last": <value - int>}, "margins": [<value - int>, <value - int>, <value - int>, <value - int>] with margins : left, up, right, down :param estimation_cfg: dictionary containing estimation configuration :param attributes: dictionary with attribute parameters :return: Datasets NamedTuple with two attributes `left` and `right` each containing a xarray.DataSet containing the variables : - im: 2D (row, col) or 3D (band_im, row, col) xarray.DataArray float32 - col_disparity: 3D (disp, row, col) xarray.DataArray float32 - row_disparity: 3D (disp, row, col) xarray.DataArray float32 """ if estimation_cfg is None: check_disparities(input_config) else: input_config["col_disparity"] = {"init": -9999, "range": 0} input_config["row_disparity"] = {"init": -9999, "range": 0} return Datasets( pandora_img_tools.create_dataset_from_inputs(input_config["left"], roi).pipe( add_disparity_grid, input_config["col_disparity"], input_config["row_disparity"], attributes ), pandora_img_tools.create_dataset_from_inputs(input_config["right"], roi).pipe( add_disparity_grid, input_config["col_disparity"], input_config["row_disparity"], attributes, True, ), )
[docs] def check_disparities(input_config: dict) -> None: """ Do various check against disparities properties. :param input_config: configuration used to create dataset. :raises SystemExit: If any check fails. """ check_disparity_presence(input_config) for disparity in [input_config["col_disparity"], input_config["row_disparity"]]: check_disparity_types(disparity)
[docs] def check_disparity_presence(input_config: dict) -> None: """ Check that disparity keys are not missing from input_config. :param input_config: configuration used to create dataset. :raises SystemExit: if one or both keys are missing """ missing = {"col_disparity", "row_disparity"} - set(input_config) if len(missing) == 1: raise KeyError(f"`{missing.pop()}` is mandatory.") if len(missing) == 2: raise KeyError("`col_disparity` and `row_disparity` are mandatory.")
[docs] def check_disparity_types(disparity: Any) -> None: """ Check that disparity a dictionary with keys "init" and range" where "init" is either: - an integer - a path to a grid with integer values :param disparity: disparity to check :raises SystemExit: if it does not meet requirements """ # Check disparity type if disparity is None or not isinstance(disparity, dict): raise ValueError("The input disparity must be a dictionary.") # Check that dictionary keys are correct if not set(disparity.keys()) == {"init", "range"}: raise ValueError("Disparity dictionary should contains keys : init and range", disparity) # Check that init is an integer or a path to a grid if not isinstance(disparity["init"], (int, str)): raise ValueError("Disparity init should be an integer or a path to a grid") # Check that range value is a positive integer if disparity["range"] < 0 or not isinstance(disparity["range"], int): raise ValueError("Disparity range should be an integer greater or equal to 0")
[docs] def add_disparity_grid( dataset: xr.Dataset, col_disparity: dict, row_disparity: dict, attributes: dict = None, right: bool = False, ) -> xr.Dataset: """ Add disparity to dataset :param dataset: xarray dataset :param col_disparity: Disparity interval for columns :param row_disparity: Disparity interval for rows :param attributes: dictionary with attribute parameters :param right: indicates whether the disparity grid is added to the right dataset :return: dataset : updated dataset """ # Origin corresponds to the first point of the dataset when the disparity is not given as an output directory # Otherwise, it corresponds to the origin of the ROI saved in the attributes if attributes is None: origin = Origin(0, 0) step = Step(1, 1) user_invalid_disp = np.nan else: origin = Origin(attributes["origin_coordinates"]["row"], attributes["origin_coordinates"]["col"]) step = Step(attributes["step"]["row"], attributes["step"]["col"]) user_invalid_disp = attributes["invalid_disp"] # Creates min and max disparity grids col_disp_min_max, col_disp_interval, col_no_data = get_min_max_disp_from_dicts( dataset, col_disparity, origin, step, user_invalid_disp, right=right ) row_disp_min_max, row_disp_interval, row_no_data = get_min_max_disp_from_dicts( dataset, row_disparity, origin, step, user_invalid_disp, right=right ) # Add disparity grids to dataset for key, disparity_data, source, no_data in zip( ["col_disparity", "row_disparity"], [col_disp_min_max, row_disp_min_max], [col_disp_interval, row_disp_interval], [col_no_data, row_no_data], ): dataset[key] = xr.DataArray( disparity_data, dims=["band_disp", "row", "col"], coords={"band_disp": ["min", "max"]}, attrs={"no_data": no_data}, ) dataset.attrs[f"{key}_source"] = source return dataset
[docs] def get_min_max_disp_from_dicts( dataset: xr.Dataset, disparity: dict, origin: Origin, step: Step, user_invalid_disp: int | float, right: bool = False, ) -> tuple[NDArray, list, float | None]: """ Transforms input disparity dicts with constant init into min/max disparity grids :param dataset: xarray dataset :param disparity: input disparity :param origin: origin of the grid :param step: step that separates two points in the disparity grid :param user_invalid_disp: user invalid disparity value :param right: indicates whether the disparity grid is added to the right dataset :return: 3D numpy array containing min/max disparity grids and list with disparity source """ disparity_dtype = np.float32 disp_min_max = np.full((2, dataset.sizes["row"], dataset.sizes["col"]), user_invalid_disp, dtype=disparity_dtype) nodata = None # Creates min and max disparity grids if initial disparity is constant (int) if isinstance(disparity["init"], int): disp_interval = [ disparity["init"] * pow(-1, right) - disparity["range"], disparity["init"] * pow(-1, right) + disparity["range"], ] disp_min_max[0, ...] = disp_interval[0] disp_min_max[1, ...] = disp_interval[1] # Creates min and max disparity grids if initial disparities are variable (grid) elif isinstance(disparity["init"], str): # Get dataset coordinates to select correct zone of disparity grids if we are using a ROI rows = dataset.row.data cols = dataset.col.data row_offset = origin.row col_offset = origin.col window = Window(cols[0], rows[0], cols.size, rows.size) # Get disparity data reader = pandora_img_tools.rasterio_open(disparity["init"]) disp_data = reader.read(1, out_dtype=disparity_dtype, window=window) # When using disparity maps from a previous execution as ROI, # the initial disparity grids can be smaller than the image window. # In this case, we use the entire initial disparity grid and ROI margins are included in disp_min_max. if disp_data.shape < dataset["im"].data.shape: disp_data = reader.read(1, out_dtype=disparity_dtype) # If disp_min_max corresponds to a ROI, we need to convert origin coordinates as index row_offset -= rows[0] col_offset -= cols[0] nodata = reader.meta.get("nodata") # Work on disp_data to avoid transformation on nodata is_data_mask = build_usable_data_mask(disp_data, nodata) # We use np.round to ensure that disp_interval and disp_min_max contains # integer disparity values in cases where sub-pixel disparity maps are reused as initial disparities disp_data = np.round(disp_data) disp_interval = [ np.min(disp_data[is_data_mask] * pow(-1, right) - disparity["range"]), np.max(disp_data[is_data_mask] * pow(-1, right) + disparity["range"]), ] last_row_index = row_offset + disp_data.shape[0] * step.row last_col_index = col_offset + disp_data.shape[1] * step.col row_slice = np.s_[row_offset : last_row_index : step.row] col_slice = np.s_[col_offset : last_col_index : step.col] # Use disparity data to creates min/max grids disp_min_max[0, row_slice, col_slice] = disp_data * pow(-1, right) - disparity["range"] disp_min_max[1, row_slice, col_slice] = disp_data * pow(-1, right) + disparity["range"] # Restore nodata disp_min_max[0, row_slice, col_slice][~is_data_mask] = disp_data[~is_data_mask] disp_min_max[1, row_slice, col_slice][~is_data_mask] = disp_data[~is_data_mask] return disp_min_max, disp_interval, nodata
[docs] def shift_disp_row_img(img_right: xr.Dataset, dec_row: int) -> xr.Dataset: """ Return a Dataset that contains the shifted right images :param img_right: right Dataset image containing : - im : 2D (row, col) xarray.Datasat :param dec_row: the value of shifting for dispy :return: img_right_shift: Dataset containing the shifted image """ # coordinates of images row = img_right.get("row") col = img_right.get("col") # To avoid propagation of nan in shifted data, we use -9999 instead of nan if necessary no_data = convert_no_data(img_right.attrs["no_data_img"]) # shifted image by scipy data = shift(img_right["im"].data, (-dec_row, 0), cval=no_data) # create shifted image dataset img_right_shift = xr.Dataset({"im": (["row", "col"], data)}, coords={"row": row, "col": col}) # add attributes to dataset img_right_shift.attrs = { "no_data_img": no_data, "valid_pixels": 0, # arbitrary default value "no_data_mask": 1, } # arbitrary default value # Pandora replace all nan values by -9999 no_data_pixels = np.where(data == no_data) # add mask to the shifted image in dataset img_right_shift["msk"] = xr.DataArray( np.full((data.shape[0], data.shape[1]), img_right_shift.attrs["valid_pixels"]).astype(np.int16), dims=["row", "col"], ) # associate nan value in mask to the no_data param img_right_shift["msk"].data[no_data_pixels] = int(img_right_shift.attrs["no_data_mask"]) return img_right_shift
[docs] def get_margins_values(init_value: int | np.ndarray, range_value: int, margins: list) -> tuple[int, int]: """ Generate the values of margins :param init_value: init value for disparity interval :param range_value: range value for disparity interval :param margins: necessary value for margins :return: Margins value """ disp_min, disp_max = get_extrema_disparity(init_value, range_value) return max(margins[0] - disp_min, 0), max(margins[1] + disp_max, 0)
[docs] def get_roi_processing(roi: dict, col_disparity: dict, row_disparity: dict) -> dict: """ Return a roi which takes disparities into account. Update cfg roi with new margins. :param roi: roi in config file "col": {"first": <value - int>, "last": <value - int>}, "row": {"first": <value - int>, "last": <value - int>}, "margins": [<value - int>, <value - int>, <value - int>, <value - int>] with margins : left, up, right, down :param col_disparity: init and range for disparities in columns. :param row_disparity: init and range for disparities in rows. """ new_roi = copy.deepcopy(roi) disparity_row_init = get_initial_disparity(row_disparity) disparity_col_init = get_initial_disparity(col_disparity) # for columns left, right = get_margins_values(disparity_col_init, col_disparity["range"], [roi["margins"][0], roi["margins"][2]]) # for rows up, down = get_margins_values(disparity_row_init, row_disparity["range"], [roi["margins"][1], roi["margins"][3]]) new_roi["margins"] = (left, up, right, down) # Update user ROI with new margins. roi["margins"] = new_roi["margins"] return new_roi
[docs] def remove_roi_margins(dataset: xr.Dataset, cfg: dict): """ Remove ROI margins before saving output dataset :param dataset: dataset containing disparity row and col maps :param cfg: output configuration of the pandora2d machine """ step = cfg["pipeline"]["matching_cost"]["step"] row = dataset.row.data col = dataset.col.data # Initialized indexes to get right rows and columns left, up, right, down = (0, 0, len(col), len(row)) # Example with col = [8,10,12,14,16], step_col=2, row = [0,4,8,12], step_row=4 # ROI={ # {"col": "first": 10, "last": 14}, # {"row": "first": 0, "last": 10} } # {"margins": (3,3,3,3)} # According to ROI, we want new_col=[10,12,14]=col[1:-1] # with 1=floor((cfg["ROI"]["col"]["first"] - col[0]) / step_col)=left # and -1=floor((cfg["ROI"]["col"]["last"] - col[-1]) / step_col)=right # According to ROI, we want new_row=[0,4,8]=row[0:-1] # with 0=initialized up # and -1=floor((cfg["ROI"]["row"]["last"] - row[-1]) / step[0])=down # Get the correct indexes to get the right columns based on the user ROI if col[0] < cfg["ROI"]["col"]["first"]: left = floor((cfg["ROI"]["col"]["first"] - col[0]) / step[1]) if col[-1] > cfg["ROI"]["col"]["last"]: right = floor((cfg["ROI"]["col"]["last"] - col[-1]) / step[1]) # Get the correct indexes to get the right rows based on the user ROI if row[0] < cfg["ROI"]["row"]["first"]: up = floor((cfg["ROI"]["row"]["first"] - row[0]) / step[0]) if row[-1] > cfg["ROI"]["row"]["last"]: down = floor((cfg["ROI"]["row"]["last"] - row[-1]) / step[0]) # Create a new dataset with right rows and columns. data_variables = { "row_map": (("row", "col"), dataset["row_map"].data[up:down, left:right]), "col_map": (("row", "col"), dataset["col_map"].data[up:down, left:right]), "correlation_score": (("row", "col"), dataset["correlation_score"].data[up:down, left:right]), "validity": (("row", "col", "criteria"), dataset["validity"].data[up:down, left:right, :]), } # Check if the confidence measure exists if "confidence_measure" in dataset: data_variables["confidence_measure"] = ( ("row", "col"), dataset["confidence_measure"].data[up:down, left:right], ) coords = {"row": row[up:down], "col": col[left:right], "criteria": dataset.criteria.values} new_dataset = xr.Dataset(data_variables, coords) new_dataset.attrs = dataset.attrs return new_dataset
[docs] def row_zoom_img( img: np.ndarray, ny: int, subpix: int, coords: Coordinates, ind: int, no_data: float | int, order: int = 1 ) -> xr.Dataset: """ Return a list that contains the shifted right images in row This method is temporary, the user can then choose the filter for this function :param img: image to shift :param ny: row number in data :param subpix: subpixel precision = (1 or pair number) :param coords: coordinates for output datasets :param ind: index of range(subpix) :param no_data: no_data value in img :param order: The order of the spline interpolation, default is 1. The order has to be in the range 0-5. :return: an array that contains the shifted right images in row """ shift = 1 / subpix # For each index, shift the right image for subpixel precision 1/subpix*index data = zoom(img, ((ny * subpix - (subpix - 1)) / float(ny), 1), order=order)[ind::subpix, :] # Add a row full of no data at the end of data have the same shape as img # It enables to use Pandora's compute_cost_volume() methods, # which only accept left and right images of the same shape. data = np.pad(data, ((0, 1), (0, 0)), "constant", constant_values=no_data) row = np.arange( coords.get("row").values[0] + shift * ind, coords.get("row").values[-1] + 1, step=1 ) # type: np.ndarray return xr.Dataset( {"im": (["row", "col"], data)}, coords={"row": row, "col": coords.get("col")}, )
[docs] def col_zoom_img( img: np.ndarray, nx: int, subpix: int, coords: Coordinates, ind: int, no_data: float | int, order: int = 1 ) -> xr.Dataset: """ Return a list that contains the shifted right images in col This method is temporary, the user can then choose the filter for this function :param img: image to shift :param nx: col number in data :param subpix: subpixel precision = (1 or pair number) :param coords: coordinates for output datasets :param ind: index of range(subpix) :param no_data: no_data value in img :param order: The order of the spline interpolation, default is 1. The order has to be in the range 0-5. :return: an array that contains the shifted right images in col """ shift = 1 / subpix # For each index, shift the right image for subpixel precision 1/subpix*index data = zoom(img, (1, (nx * subpix - (subpix - 1)) / float(nx)), order=order)[:, ind::subpix] # Add a col full of no data at the end of data to have the same shape as img # It enables to use Pandora's compute_cost_volume() methods, # which only accept left and right images of the same shape. data = np.pad(data, ((0, 0), (0, 1)), "constant", constant_values=no_data) col = np.arange( coords.get("col").values[0] + shift * ind, coords.get("col").values[-1] + 1, step=1 ) # type: np.ndarray return xr.Dataset( {"im": (["row", "col"], data)}, coords={"row": coords.get("row"), "col": col}, )
[docs] def shift_subpix_img(img_right: xr.Dataset, subpix: int, row: bool = True, order: int = 1) -> list[xr.Dataset]: """ Return an array that contains the shifted right images :param img_right: Dataset image containing the image im : 2D (row, col) xarray.Dataset :param subpix: subpixel precision = (1 or pair number) :param row: row to shift (otherwise column) :param order: The order of the spline interpolation, default is 1. The order has to be in the range 0-5. :return: an array that contains the shifted right images """ img_right_shift = [img_right] if subpix > 1: # To avoid propagation of nan in shifted data, we use -9999 instead of nan if necessary no_data = convert_no_data(img_right.attrs["no_data_img"]) for ind in np.arange(1, subpix, dtype=int): if row: img_right_shift.append( row_zoom_img( img_right["im"].data, img_right.sizes["row"], subpix, img_right.coords, ind, no_data, order, ).assign_attrs(img_right.attrs) ) else: img_right_shift.append( col_zoom_img( img_right["im"].data, img_right.sizes["col"], subpix, img_right.coords, ind, no_data, order, ).assign_attrs(img_right.attrs) ) return img_right_shift
[docs] def shift_subpix_img_2d(img_right: xr.Dataset, subpix: int, order: int = 1) -> list[xr.Dataset]: """ Return an array that contains the shifted right images in rows and columns :param img_right: Dataset image containing the image im : 2D (row, col) xarray.Dataset :param subpix: subpixel precision = (1 or pair number) :param column: column to shift (otherwise row) :param order: The order of the spline interpolation, default is 1. The order has to be in the range 0-5. :return: an array that contains the shifted right images """ # Row shifted images img_right_shift = shift_subpix_img(img_right, subpix, row=True, order=order) img_right_shift_2d = [] # Columns shifted images for _, img in enumerate(img_right_shift): img_right_shift_2d += shift_subpix_img(img, subpix, row=False, order=order) return img_right_shift_2d
[docs] def get_initial_disparity(disparity: dict) -> NDArray | int: """ Return initial disparity. When initial disparity is read from a file, nodata and infinite values are replaced by NaNs. :param disparity: init and range for disparities in columns. :return: initial disparity """ if isinstance(disparity["init"], str): reader = pandora_img_tools.rasterio_open(disparity["init"]) disparity_init = reader.read() nodata = reader.meta.get("nodata") return np.where(build_usable_data_mask(disparity_init, nodata), disparity_init, np.nan) return disparity["init"]
[docs] def get_extrema_disparity(init_value: NDArray | float, range_value: int) -> tuple[int, int]: """ Returns [min, max] disparity :param init_value: initial disparity :param range_value: disparity exploration value :return: [min disparity, max disparity] """ disp_min = int(np.nanmin(init_value)) - range_value disp_max = int(np.nanmax(init_value)) + range_value return disp_min, disp_max
[docs] def convert_no_data(no_data: float | int) -> float | int: """ If no_data is NaN or Inf, return -9999. Otherwise return no_data. :param no_data: no data value :return: updated no data value """ if np.isnan(no_data) or np.isinf(no_data): return -9999 return no_data