# This file is part of PANDORA2D
#
# https://github.com/CNES/Pandora2D
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Module for common base of all MatchingCost methods.
"""
import copy
from abc import ABC, abstractmethod
from collections.abc import Mapping
from typing import cast
import numpy as np
import xarray as xr
from json_checker import And, Checker
from numpy.typing import NDArray
from pandora import matching_cost as pandora_matching_cost
import pandora2d.schema as cst_schema
from pandora2d.criteria import get_criteria_dataarray
from pandora2d.margins import Margins
[docs]
class BaseMatchingCost(ABC):
"""MatchingCost base class."""
def __init__(self, cfg: dict) -> None:
"""
Initialisation of matching_cost class
:param cfg: user_config for matching cost
:return: None
"""
[docs]
self._cfg = self.check_conf(cfg)
[docs]
self._method = self._cfg["matching_cost_method"]
# Cast to int in order to help mypy because self.cfg is a Dict, and it can not know the type of step.
[docs]
self._step_row = cast(int, self._cfg["step"][0])
[docs]
self._step_col = cast(int, self._cfg["step"][1])
[docs]
self._window_size = cast(
int, self._cfg["window_size"]
) # _window_size attribute required to compute HalfWindowMargins
[docs]
self._subpix = cast(int, self._cfg["subpix"])
[docs]
self._spline_order = cast(int, self._cfg["spline_order"])
[docs]
self._float_precision = np.dtype(self._cfg["float_precision"])
[docs]
self.cost_volumes: xr.Dataset | None = None
[docs]
self.shifted_right_images: list[xr.Dataset] = []
@property
[docs]
def schema(self):
return {
# Census is not expected to be used with Pandora2D
"matching_cost_method": And(str, lambda x: x not in ["census"]),
"window_size": And(int, lambda input: input > 0 and (input % 2) != 0),
"step": cst_schema.STEP_SCHEMA,
"spline_order": And(int, lambda y: 1 <= y <= 5),
"subpix": And(int, lambda sp: sp in [1, 2, 4]),
"float_precision": str,
}
@property
[docs]
def defaults(self):
return {
"window_size": 5,
"subpix": 1,
"step": [1, 1],
"spline_order": 1,
"float_precision": "float32",
}
[docs]
def check_conf(self, cfg: dict) -> dict[str, str]:
"""
Check the matching cost configuration
:param cfg: user_config for matching cost
:return: cfg: global configuration
"""
updated_config = self._update_with_default_config_values(cfg)
checker = Checker(self.schema)
checker.validate(updated_config)
return updated_config
[docs]
def _update_with_default_config_values(self, cfg: dict):
return {**self.defaults, **cfg}
@property
[docs]
def cfg(self) -> Mapping[str, str | int | list[int]]:
"""
Get used configuration
:return: cfg: dictionary with all parameters
"""
return self._cfg
@property
[docs]
def step(self) -> list[int]:
"""
Get step [row, col]
:return: step: list with row & col step
"""
return [self._step_row, self._step_col]
@property
[docs]
def window_size(self) -> int:
"""
Get window_size
:return: window_size: window used to compute correlation
"""
return self._window_size
[docs]
def allocate_cost_volumes(
self,
cost_volume_attr: dict,
row: np.ndarray,
col: np.ndarray,
disp_range_row: np.ndarray,
disp_range_col: np.ndarray,
np_data: np.ndarray = None,
) -> xr.Dataset:
"""
Allocate the cost volumes
:param cost_volume_attr: the cost_volume's attributes
:param row: dimension of the image (row)
:param col: dimension of the image (columns)
:param disp_range_row: rows disparity range.
:param disp_range_col: columns disparity range.
:param np_data: 4D numpy.ndarray og cost_volumes. Defaults to None.
:return: cost_volumes: 4D Dataset containing the cost_volumes
"""
# Create the cost volume
if np_data is None:
np_data = np.zeros(
(len(row), len(col), len(disp_range_row), len(disp_range_col)), dtype=self._float_precision
)
cost_volumes = xr.Dataset(
{"cost_volumes": (["row", "col", "disp_row", "disp_col"], np_data)},
coords={"row": row, "col": col, "disp_row": disp_range_row, "disp_col": disp_range_col},
)
cost_volumes.attrs = cost_volume_attr
return cost_volumes
[docs]
def get_cv_row_col_coords(
self, img_row_coordinates: NDArray, img_col_coordinates: NDArray, cfg: dict
) -> tuple[NDArray, NDArray]:
"""
Compute cost_volumes row and col coordinates according to image coordinates
:param img_row_coordinates: row coordinates of left image
:param img_col_coordinates: col coordinates of left image
:param cfg: matching_cost computation configuration
:return: a Tuple of np.ndarray that contains the right coordinates for row and col
"""
# Get updated ROI left/up margin for get_coordinates() method
# To get right coordinates in cost_volume when initial left_margin > cfg["ROI"]["col"]["first"]
# or initial up_margin > cfg["ROI"]["row"]["first"]
# We need to have left_margin = cfg["ROI"]["col"]["first"] and up_margin = cfg["ROI"]["row"]["first"]
cfg_for_get_coordinates = BaseMatchingCost.cfg_for_get_coordinates(cfg)
# Get correct coordinates to be sure to process the first point of ROI
if "ROI" in cfg:
col_coords = pandora_matching_cost.AbstractMatchingCost.get_coordinates(
margin=cfg_for_get_coordinates["ROI"]["margins"][0],
img_coordinates=img_col_coordinates,
step=self._step_col,
)
row_coords = pandora_matching_cost.AbstractMatchingCost.get_coordinates(
margin=cfg_for_get_coordinates["ROI"]["margins"][1],
img_coordinates=img_row_coordinates,
step=self._step_row,
)
else:
row_coords = np.arange(img_row_coordinates[0], img_row_coordinates[-1] + 1, self._step_row)
col_coords = np.arange(img_col_coordinates[0], img_col_coordinates[-1] + 1, self._step_col)
return row_coords, col_coords
[docs]
def get_disp_row_coords(self, img_left: xr.Dataset, margins: Margins) -> NDArray:
"""
Compute cost_volumes row disparity coordinates according to image disparities
:param img_left: xarray.Dataset containing :
- im : 2D (row, col) xarray.DataArray
- msk : 2D (row, col) xarray.DataArray
:param margins: refinement margins
:return: a Tuple of np.ndarray that contains the right coordinates for disparities
"""
# Get min/max row disparity grids
min_row, max_row = img_left.attrs["row_disparity_source"]
# Add refinement margins to disparity grids if needed.
if margins is not None:
min_row -= margins.up
max_row += margins.down
# Array with all row disparities
disps_row = pandora_matching_cost.AbstractMatchingCost.get_disparity_range(min_row, max_row, self._subpix)
return disps_row
[docs]
def get_disp_col_coords(self, img_left: xr.Dataset, margins: Margins) -> NDArray:
"""
Compute cost_volumes col disparity coordinates according to image disparities
:param img_left: xarray.Dataset containing :
- im : 2D (row, col) xarray.DataArray
- msk : 2D (row, col) xarray.DataArray
:param margins: refinement margins
:return: a Tuple of np.ndarray that contains the right coordinates for disparities
"""
# Get min/max col disparity grids
min_col, max_col = img_left.attrs["col_disparity_source"]
# Add refinement margins to disparity grids if needed.
if margins is not None:
min_col -= margins.left
max_col += margins.right
# Array with all col disparities
disps_col = pandora_matching_cost.AbstractMatchingCost.get_disparity_range(min_col, max_col, self._subpix)
return disps_col
@staticmethod
[docs]
def cfg_for_get_coordinates(cfg: dict) -> dict:
"""
Return right configuration to give to get_coordinates or get_coordinates_2d methods.
To get right coordinates in cost_volume when initial left_margin > cfg["ROI"]["col"]["first"]
or initial up_margin > cfg["ROI"]["row"]["first"]
We need to have left_margin = cfg["ROI"]["col"]["first"] and up_margin = cfg["ROI"]["row"]["first"]
:param cfg: user configuration
:return: updated configuration to be sure to process the first point of ROI
when ROI margin > ROI first point (left or up)
"""
new_cfg = copy.deepcopy(cfg)
if "ROI" in cfg:
new_cfg["ROI"]["margins"] = (
min(cfg["ROI"]["margins"][0], cfg["ROI"]["col"]["first"]),
min(cfg["ROI"]["margins"][1], cfg["ROI"]["row"]["first"]),
cfg["ROI"]["margins"][2],
cfg["ROI"]["margins"][3],
)
return new_cfg
@property
@abstractmethod
[docs]
def allocate(
self,
img_left: xr.Dataset,
img_right: xr.Dataset,
cfg: dict,
margins: Margins = None,
) -> None:
"""
Allocate the cost volume
:param img_left: xarray.Dataset containing :
- im : 2D (row, col) xarray.DataArray
- msk : 2D (row, col) xarray.DataArray
:param img_right: xarray.Dataset containing :
- im : 2D (row, col) xarray.DataArray
- msk : 2D (row, col) xarray.DataArray
:param cfg: matching_cost computation configuration
:param margins: refinement margins
:return: None
"""
img_row_coordinates = img_left["im"].coords["row"].values
img_col_coordinates = img_left["im"].coords["col"].values
row_coords, col_coords = self.get_cv_row_col_coords(img_row_coordinates, img_col_coordinates, cfg)
# Get disparity coordinates for cost_volumes
disps_row_coords = self.get_disp_row_coords(img_left, margins)
disps_col_coords = self.get_disp_col_coords(img_left, margins)
grid_attrs = img_left.attrs
grid_attrs.update(
{
"window_size": self._window_size,
"subpixel": self._subpix,
"offset_row_col": int((self._window_size - 1) / 2),
"measure": self._method,
"type_measure": "max",
"disparity_margins": margins,
"step": self.step,
"spline_order_filter": self._spline_order,
}
)
# Allocate 4D cost_volumes
self.cost_volumes = self.allocate_cost_volumes(
grid_attrs, row_coords, col_coords, disps_row_coords, disps_col_coords, None
)
self.cost_volumes["criteria"] = get_criteria_dataarray(img_left, img_right, self.cost_volumes)
self.set_shifted_right_images(img_right)
[docs]
def set_out_of_disparity_range_to_other_value(
self,
img_left: xr.Dataset,
value: int | float,
) -> None:
"""
Put special value in data where the row or column disparity is out of the range defined
by disparity grids.
The operation is done inplace.
:param img_left: left image xarray.Dataset
:param value: value to set on data.
"""
# Select correct rows and columns in case of a step different from 1.
row_cv = self.cost_volumes.row.values
col_cv = self.cost_volumes.col.values
# Row disparity
set_out_of_row_disparity_range_to_other_value(
self.cost_volumes["cost_volumes"],
img_left["row_disparity"].sel(band_disp="min", row=row_cv, col=col_cv).data,
img_left["row_disparity"].sel(band_disp="max", row=row_cv, col=col_cv).data,
value,
self.cost_volumes.attrs["row_disparity_source"],
)
# Column disparity
set_out_of_col_disparity_range_to_other_value(
self.cost_volumes["cost_volumes"],
img_left["col_disparity"].sel(band_disp="min", row=row_cv, col=col_cv).data,
img_left["col_disparity"].sel(band_disp="max", row=row_cv, col=col_cv).data,
value,
self.cost_volumes.attrs["col_disparity_source"],
)
@abstractmethod
[docs]
def set_shifted_right_images(self, img_right: xr.Dataset) -> None:
"""
Compute shifted by subpix right image and assign `shifted_right_images` attribute.
:param img_right: xarray.Dataset containing :
- im : 2D (row, col) xarray.DataArray
- msk : 2D (row, col) xarray.DataArray
:return: None
"""
@abstractmethod
[docs]
def compute_cost_volumes(
self,
img_left: xr.Dataset,
img_right: xr.Dataset,
margins: Margins = None,
) -> xr.Dataset:
"""
Computes the cost volumes
:param img_left: xarray.Dataset containing :
- im : 2D (row, col) xarray.DataArray
- msk : 2D (row, col) xarray.DataArray
:param img_right: xarray.Dataset containing :
- im : 2D (row, col) xarray.DataArray
- msk : 2D (row, col) xarray.DataArray
:param margins: refinement margins
:return: cost_volumes: 4D Dataset containing the cost_volumes
"""
[docs]
def set_out_of_row_disparity_range_to_other_value(
data: xr.DataArray,
min_disp_grid: NDArray[np.floating],
max_disp_grid: NDArray[np.floating],
value: int | float,
global_disparity_range: list[int],
) -> None:
"""
Put special value in data where the row disparity is out of the range defined by disparity grids.
The operation is done inplace.
:param data: cost_volumes to modify.
:param min_disp_grid: grid of min disparity.
:param max_disp_grid: grid of max disparity.
:param value: value to set on data.
:param global_disparity_range: global row disparity range
"""
ndisp_row = data.shape[-2]
# We want to put special value on points that are not in the global disparity range (row_disparity_source)
for disp_row in range(ndisp_row):
masking = np.nonzero(
np.logical_or(
(data.coords["disp_row"].data[disp_row] < min_disp_grid)
& (data.coords["disp_row"].data[disp_row] >= global_disparity_range[0]),
(data.coords["disp_row"].data[disp_row] > max_disp_grid)
& (data.coords["disp_row"].data[disp_row] <= global_disparity_range[1]),
)
)
data.data[masking[0], masking[1], disp_row, :] = value
[docs]
def set_out_of_col_disparity_range_to_other_value(
data: xr.DataArray,
min_disp_grid: NDArray[np.floating],
max_disp_grid: NDArray[np.floating],
value: int | float,
global_disparity_range: list[int],
) -> None:
"""
Put special value in data where the column disparity is out of the range defined
by disparity grids.
The operation is done inplace.
:param data: cost_volumes to modify.
:param min_disp_grid: grid of min disparity.
:param max_disp_grid: grid of max disparity.
:param value: value to set on data.
:param global_disparity_range: global column disparity range
"""
ndisp_col = data.shape[-1]
# We want to put special value on points that are not in the global disparity range (col_disparity_source)
for disp_col in range(ndisp_col):
masking = np.nonzero(
np.logical_or(
(data.coords["disp_col"].data[disp_col] < min_disp_grid)
& (data.coords["disp_col"].data[disp_col] >= global_disparity_range[0]),
(data.coords["disp_col"].data[disp_col] > max_disp_grid)
& (data.coords["disp_col"].data[disp_col] <= global_disparity_range[1]),
)
)
data.data[masking[0], masking[1], :, disp_col] = value