#!/usr/bin/env python
#
# Copyright (c) 2026 Centre National d'Etudes Spatiales (CNES).
# Copyright (c) 2026 CS GROUP France
#
# This file is part of PANDORA2D
#
# https://github.com/CNES/Pandora2D
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains functions associated to raster images.
"""
# mypy: disable-error-code="attr-defined, no-redef"
# pylint: disable="useless-import-alias,redefined-outer-name"
# xarray.Coordinates corresponds to the latest version of xarray.
# xarray.Coordinate corresponds to the version installed by the artifactory.
# Try/except block to be deleted once the version of xarray has been updated by CNES.
try:
from xarray import Coordinates as Coordinates
except ImportError:
from xarray import Coordinate as Coordinates
import copy
from math import floor
from typing import Any, NamedTuple
import numpy as np
import pandora.img_tools as pandora_img_tools
import xarray as xr
from numpy.typing import NDArray
from rasterio.windows import Window
from scipy.ndimage import shift, zoom
from pandora2d.types import Origin, Step
from pandora2d.common import build_usable_data_mask
[docs]
class Datasets(NamedTuple):
"""NamedTuple to store left and right datasets."""
[docs]
def check_disparities(input_config: dict) -> None:
"""
Do various check against disparities properties.
:param input_config: configuration used to create dataset.
:raises SystemExit: If any check fails.
"""
check_disparity_presence(input_config)
for disparity in [input_config["col_disparity"], input_config["row_disparity"]]:
check_disparity_types(disparity)
[docs]
def check_disparity_presence(input_config: dict) -> None:
"""
Check that disparity keys are not missing from input_config.
:param input_config: configuration used to create dataset.
:raises SystemExit: if one or both keys are missing
"""
missing = {"col_disparity", "row_disparity"} - set(input_config)
if len(missing) == 1:
raise KeyError(f"`{missing.pop()}` is mandatory.")
if len(missing) == 2:
raise KeyError("`col_disparity` and `row_disparity` are mandatory.")
[docs]
def check_disparity_types(disparity: Any) -> None:
"""
Check that disparity a dictionary with keys "init" and range"
where "init" is either:
- an integer
- a path to a grid with integer values
:param disparity: disparity to check
:raises SystemExit: if it does not meet requirements
"""
# Check disparity type
if disparity is None or not isinstance(disparity, dict):
raise ValueError("The input disparity must be a dictionary.")
# Check that dictionary keys are correct
if not set(disparity.keys()) == {"init", "range"}:
raise ValueError("Disparity dictionary should contains keys : init and range", disparity)
# Check that init is an integer or a path to a grid
if not isinstance(disparity["init"], (int, str)):
raise ValueError("Disparity init should be an integer or a path to a grid")
# Check that range value is a positive integer
if disparity["range"] < 0 or not isinstance(disparity["range"], int):
raise ValueError("Disparity range should be an integer greater or equal to 0")
[docs]
def add_disparity_grid(
dataset: xr.Dataset,
col_disparity: dict,
row_disparity: dict,
attributes: dict = None,
right: bool = False,
) -> xr.Dataset:
"""
Add disparity to dataset
:param dataset: xarray dataset
:param col_disparity: Disparity interval for columns
:param row_disparity: Disparity interval for rows
:param attributes: dictionary with attribute parameters
:param right: indicates whether the disparity grid is added to the right dataset
:return: dataset : updated dataset
"""
# Origin corresponds to the first point of the dataset when the disparity is not given as an output directory
# Otherwise, it corresponds to the origin of the ROI saved in the attributes
if attributes is None:
origin = Origin(0, 0)
step = Step(1, 1)
user_invalid_disp = np.nan
else:
origin = Origin(attributes["origin_coordinates"]["row"], attributes["origin_coordinates"]["col"])
step = Step(attributes["step"]["row"], attributes["step"]["col"])
user_invalid_disp = attributes["invalid_disp"]
# Creates min and max disparity grids
col_disp_min_max, col_disp_interval, col_no_data = get_min_max_disp_from_dicts(
dataset, col_disparity, origin, step, user_invalid_disp, right=right
)
row_disp_min_max, row_disp_interval, row_no_data = get_min_max_disp_from_dicts(
dataset, row_disparity, origin, step, user_invalid_disp, right=right
)
# Add disparity grids to dataset
for key, disparity_data, source, no_data in zip(
["col_disparity", "row_disparity"],
[col_disp_min_max, row_disp_min_max],
[col_disp_interval, row_disp_interval],
[col_no_data, row_no_data],
):
dataset[key] = xr.DataArray(
disparity_data,
dims=["band_disp", "row", "col"],
coords={"band_disp": ["min", "max"]},
attrs={"no_data": no_data},
)
dataset.attrs[f"{key}_source"] = source
return dataset
[docs]
def get_min_max_disp_from_dicts(
dataset: xr.Dataset,
disparity: dict,
origin: Origin,
step: Step,
user_invalid_disp: int | float,
right: bool = False,
) -> tuple[NDArray, list, float | None]:
"""
Transforms input disparity dicts with constant init into min/max disparity grids
:param dataset: xarray dataset
:param disparity: input disparity
:param origin: origin of the grid
:param step: step that separates two points in the disparity grid
:param user_invalid_disp: user invalid disparity value
:param right: indicates whether the disparity grid is added to the right dataset
:return: 3D numpy array containing min/max disparity grids and list with disparity source
"""
disparity_dtype = np.float32
disp_min_max = np.full((2, dataset.sizes["row"], dataset.sizes["col"]), user_invalid_disp, dtype=disparity_dtype)
nodata = None
# Creates min and max disparity grids if initial disparity is constant (int)
if isinstance(disparity["init"], int):
disp_interval = [
disparity["init"] * pow(-1, right) - disparity["range"],
disparity["init"] * pow(-1, right) + disparity["range"],
]
disp_min_max[0, ...] = disp_interval[0]
disp_min_max[1, ...] = disp_interval[1]
# Creates min and max disparity grids if initial disparities are variable (grid)
elif isinstance(disparity["init"], str):
# Get dataset coordinates to select correct zone of disparity grids if we are using a ROI
rows = dataset.row.data
cols = dataset.col.data
row_offset = origin.row
col_offset = origin.col
window = Window(cols[0], rows[0], cols.size, rows.size)
# Get disparity data
reader = pandora_img_tools.rasterio_open(disparity["init"])
disp_data = reader.read(1, out_dtype=disparity_dtype, window=window)
# When using disparity maps from a previous execution as ROI,
# the initial disparity grids can be smaller than the image window.
# In this case, we use the entire initial disparity grid and ROI margins are included in disp_min_max.
if disp_data.shape < dataset["im"].data.shape:
disp_data = reader.read(1, out_dtype=disparity_dtype)
# If disp_min_max corresponds to a ROI, we need to convert origin coordinates as index
row_offset -= rows[0]
col_offset -= cols[0]
nodata = reader.meta.get("nodata")
# Work on disp_data to avoid transformation on nodata
is_data_mask = build_usable_data_mask(disp_data, nodata)
# We use np.round to ensure that disp_interval and disp_min_max contains
# integer disparity values in cases where sub-pixel disparity maps are reused as initial disparities
disp_data = np.round(disp_data)
disp_interval = [
np.min(disp_data[is_data_mask] * pow(-1, right) - disparity["range"]),
np.max(disp_data[is_data_mask] * pow(-1, right) + disparity["range"]),
]
last_row_index = row_offset + disp_data.shape[0] * step.row
last_col_index = col_offset + disp_data.shape[1] * step.col
row_slice = np.s_[row_offset : last_row_index : step.row]
col_slice = np.s_[col_offset : last_col_index : step.col]
# Use disparity data to creates min/max grids
disp_min_max[0, row_slice, col_slice] = disp_data * pow(-1, right) - disparity["range"]
disp_min_max[1, row_slice, col_slice] = disp_data * pow(-1, right) + disparity["range"]
# Restore nodata
disp_min_max[0, row_slice, col_slice][~is_data_mask] = disp_data[~is_data_mask]
disp_min_max[1, row_slice, col_slice][~is_data_mask] = disp_data[~is_data_mask]
return disp_min_max, disp_interval, nodata
[docs]
def shift_disp_row_img(img_right: xr.Dataset, dec_row: int) -> xr.Dataset:
"""
Return a Dataset that contains the shifted right images
:param img_right: right Dataset image containing :
- im : 2D (row, col) xarray.Datasat
:param dec_row: the value of shifting for dispy
:return: img_right_shift: Dataset containing the shifted image
"""
# coordinates of images
row = img_right.get("row")
col = img_right.get("col")
# To avoid propagation of nan in shifted data, we use -9999 instead of nan if necessary
no_data = convert_no_data(img_right.attrs["no_data_img"])
# shifted image by scipy
data = shift(img_right["im"].data, (-dec_row, 0), cval=no_data)
# create shifted image dataset
img_right_shift = xr.Dataset({"im": (["row", "col"], data)}, coords={"row": row, "col": col})
# add attributes to dataset
img_right_shift.attrs = {
"no_data_img": no_data,
"valid_pixels": 0, # arbitrary default value
"no_data_mask": 1,
} # arbitrary default value
# Pandora replace all nan values by -9999
no_data_pixels = np.where(data == no_data)
# add mask to the shifted image in dataset
img_right_shift["msk"] = xr.DataArray(
np.full((data.shape[0], data.shape[1]), img_right_shift.attrs["valid_pixels"]).astype(np.int16),
dims=["row", "col"],
)
# associate nan value in mask to the no_data param
img_right_shift["msk"].data[no_data_pixels] = int(img_right_shift.attrs["no_data_mask"])
return img_right_shift
[docs]
def get_margins_values(init_value: int | np.ndarray, range_value: int, margins: list) -> tuple[int, int]:
"""
Generate the values of margins
:param init_value: init value for disparity interval
:param range_value: range value for disparity interval
:param margins: necessary value for margins
:return: Margins value
"""
disp_min, disp_max = get_extrema_disparity(init_value, range_value)
return max(margins[0] - disp_min, 0), max(margins[1] + disp_max, 0)
[docs]
def get_roi_processing(roi: dict, col_disparity: dict, row_disparity: dict) -> dict:
"""
Return a roi which takes disparities into account.
Update cfg roi with new margins.
:param roi: roi in config file
"col": {"first": <value - int>, "last": <value - int>},
"row": {"first": <value - int>, "last": <value - int>},
"margins": [<value - int>, <value - int>, <value - int>, <value - int>]
with margins : left, up, right, down
:param col_disparity: init and range for disparities in columns.
:param row_disparity: init and range for disparities in rows.
"""
new_roi = copy.deepcopy(roi)
disparity_row_init = get_initial_disparity(row_disparity)
disparity_col_init = get_initial_disparity(col_disparity)
# for columns
left, right = get_margins_values(disparity_col_init, col_disparity["range"], [roi["margins"][0], roi["margins"][2]])
# for rows
up, down = get_margins_values(disparity_row_init, row_disparity["range"], [roi["margins"][1], roi["margins"][3]])
new_roi["margins"] = (left, up, right, down)
# Update user ROI with new margins.
roi["margins"] = new_roi["margins"]
return new_roi
[docs]
def remove_roi_margins(dataset: xr.Dataset, cfg: dict):
"""
Remove ROI margins before saving output dataset
:param dataset: dataset containing disparity row and col maps
:param cfg: output configuration of the pandora2d machine
"""
step = cfg["pipeline"]["matching_cost"]["step"]
row = dataset.row.data
col = dataset.col.data
# Initialized indexes to get right rows and columns
left, up, right, down = (0, 0, len(col), len(row))
# Example with col = [8,10,12,14,16], step_col=2, row = [0,4,8,12], step_row=4
# ROI={
# {"col": "first": 10, "last": 14},
# {"row": "first": 0, "last": 10} }
# {"margins": (3,3,3,3)}
# According to ROI, we want new_col=[10,12,14]=col[1:-1]
# with 1=floor((cfg["ROI"]["col"]["first"] - col[0]) / step_col)=left
# and -1=floor((cfg["ROI"]["col"]["last"] - col[-1]) / step_col)=right
# According to ROI, we want new_row=[0,4,8]=row[0:-1]
# with 0=initialized up
# and -1=floor((cfg["ROI"]["row"]["last"] - row[-1]) / step[0])=down
# Get the correct indexes to get the right columns based on the user ROI
if col[0] < cfg["ROI"]["col"]["first"]:
left = floor((cfg["ROI"]["col"]["first"] - col[0]) / step[1])
if col[-1] > cfg["ROI"]["col"]["last"]:
right = floor((cfg["ROI"]["col"]["last"] - col[-1]) / step[1])
# Get the correct indexes to get the right rows based on the user ROI
if row[0] < cfg["ROI"]["row"]["first"]:
up = floor((cfg["ROI"]["row"]["first"] - row[0]) / step[0])
if row[-1] > cfg["ROI"]["row"]["last"]:
down = floor((cfg["ROI"]["row"]["last"] - row[-1]) / step[0])
# Create a new dataset with right rows and columns.
data_variables = {
"row_map": (("row", "col"), dataset["row_map"].data[up:down, left:right]),
"col_map": (("row", "col"), dataset["col_map"].data[up:down, left:right]),
"correlation_score": (("row", "col"), dataset["correlation_score"].data[up:down, left:right]),
"validity": (("row", "col", "criteria"), dataset["validity"].data[up:down, left:right, :]),
}
# Check if the confidence measure exists
if "confidence_measure" in dataset:
data_variables["confidence_measure"] = (
("row", "col"),
dataset["confidence_measure"].data[up:down, left:right],
)
coords = {"row": row[up:down], "col": col[left:right], "criteria": dataset.criteria.values}
new_dataset = xr.Dataset(data_variables, coords)
new_dataset.attrs = dataset.attrs
return new_dataset
[docs]
def row_zoom_img(
img: np.ndarray, ny: int, subpix: int, coords: Coordinates, ind: int, no_data: float | int, order: int = 1
) -> xr.Dataset:
"""
Return a list that contains the shifted right images in row
This method is temporary, the user can then choose the filter for this function
:param img: image to shift
:param ny: row number in data
:param subpix: subpixel precision = (1 or pair number)
:param coords: coordinates for output datasets
:param ind: index of range(subpix)
:param no_data: no_data value in img
:param order: The order of the spline interpolation, default is 1. The order has to be in the range 0-5.
:return: an array that contains the shifted right images in row
"""
shift = 1 / subpix
# For each index, shift the right image for subpixel precision 1/subpix*index
data = zoom(img, ((ny * subpix - (subpix - 1)) / float(ny), 1), order=order)[ind::subpix, :]
# Add a row full of no data at the end of data have the same shape as img
# It enables to use Pandora's compute_cost_volume() methods,
# which only accept left and right images of the same shape.
data = np.pad(data, ((0, 1), (0, 0)), "constant", constant_values=no_data)
row = np.arange(
coords.get("row").values[0] + shift * ind, coords.get("row").values[-1] + 1, step=1
) # type: np.ndarray
return xr.Dataset(
{"im": (["row", "col"], data)},
coords={"row": row, "col": coords.get("col")},
)
[docs]
def col_zoom_img(
img: np.ndarray, nx: int, subpix: int, coords: Coordinates, ind: int, no_data: float | int, order: int = 1
) -> xr.Dataset:
"""
Return a list that contains the shifted right images in col
This method is temporary, the user can then choose the filter for this function
:param img: image to shift
:param nx: col number in data
:param subpix: subpixel precision = (1 or pair number)
:param coords: coordinates for output datasets
:param ind: index of range(subpix)
:param no_data: no_data value in img
:param order: The order of the spline interpolation, default is 1. The order has to be in the range 0-5.
:return: an array that contains the shifted right images in col
"""
shift = 1 / subpix
# For each index, shift the right image for subpixel precision 1/subpix*index
data = zoom(img, (1, (nx * subpix - (subpix - 1)) / float(nx)), order=order)[:, ind::subpix]
# Add a col full of no data at the end of data to have the same shape as img
# It enables to use Pandora's compute_cost_volume() methods,
# which only accept left and right images of the same shape.
data = np.pad(data, ((0, 0), (0, 1)), "constant", constant_values=no_data)
col = np.arange(
coords.get("col").values[0] + shift * ind, coords.get("col").values[-1] + 1, step=1
) # type: np.ndarray
return xr.Dataset(
{"im": (["row", "col"], data)},
coords={"row": coords.get("row"), "col": col},
)
[docs]
def shift_subpix_img(img_right: xr.Dataset, subpix: int, row: bool = True, order: int = 1) -> list[xr.Dataset]:
"""
Return an array that contains the shifted right images
:param img_right: Dataset image containing the image im : 2D (row, col) xarray.Dataset
:param subpix: subpixel precision = (1 or pair number)
:param row: row to shift (otherwise column)
:param order: The order of the spline interpolation, default is 1. The order has to be in the range 0-5.
:return: an array that contains the shifted right images
"""
img_right_shift = [img_right]
if subpix > 1:
# To avoid propagation of nan in shifted data, we use -9999 instead of nan if necessary
no_data = convert_no_data(img_right.attrs["no_data_img"])
for ind in np.arange(1, subpix, dtype=int):
if row:
img_right_shift.append(
row_zoom_img(
img_right["im"].data,
img_right.sizes["row"],
subpix,
img_right.coords,
ind,
no_data,
order,
).assign_attrs(img_right.attrs)
)
else:
img_right_shift.append(
col_zoom_img(
img_right["im"].data,
img_right.sizes["col"],
subpix,
img_right.coords,
ind,
no_data,
order,
).assign_attrs(img_right.attrs)
)
return img_right_shift
[docs]
def shift_subpix_img_2d(img_right: xr.Dataset, subpix: int, order: int = 1) -> list[xr.Dataset]:
"""
Return an array that contains the shifted right images in rows and columns
:param img_right: Dataset image containing the image im : 2D (row, col) xarray.Dataset
:param subpix: subpixel precision = (1 or pair number)
:param column: column to shift (otherwise row)
:param order: The order of the spline interpolation, default is 1. The order has to be in the range 0-5.
:return: an array that contains the shifted right images
"""
# Row shifted images
img_right_shift = shift_subpix_img(img_right, subpix, row=True, order=order)
img_right_shift_2d = []
# Columns shifted images
for _, img in enumerate(img_right_shift):
img_right_shift_2d += shift_subpix_img(img, subpix, row=False, order=order)
return img_right_shift_2d
[docs]
def get_initial_disparity(disparity: dict) -> NDArray | int:
"""
Return initial disparity.
When initial disparity is read from a file, nodata and infinite values are replaced by NaNs.
:param disparity: init and range for disparities in columns.
:return: initial disparity
"""
if isinstance(disparity["init"], str):
reader = pandora_img_tools.rasterio_open(disparity["init"])
disparity_init = reader.read()
nodata = reader.meta.get("nodata")
return np.where(build_usable_data_mask(disparity_init, nodata), disparity_init, np.nan)
return disparity["init"]
[docs]
def get_extrema_disparity(init_value: NDArray | float, range_value: int) -> tuple[int, int]:
"""
Returns [min, max] disparity
:param init_value: initial disparity
:param range_value: disparity exploration value
:return: [min disparity, max disparity]
"""
disp_min = int(np.nanmin(init_value)) - range_value
disp_max = int(np.nanmax(init_value)) + range_value
return disp_min, disp_max
[docs]
def convert_no_data(no_data: float | int) -> float | int:
"""
If no_data is NaN or Inf, return -9999.
Otherwise return no_data.
:param no_data: no data value
:return: updated no data value
"""
if np.isnan(no_data) or np.isinf(no_data):
return -9999
return no_data