Source code for pandora2d.multiscale.model_estimation

# Copyright (c) 2025 Centre National d'Etudes Spatiales (CNES).
#
# This file is part of PANDORA2D
#
#     https://github.com/CNES/Pandora2D
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This module contains methods associated to the estimation model for the MVP
"""

from typing import Tuple, List, Union, Dict

import numpy as np
import xarray as xr

from numpy.typing import NDArray
from scipy.linalg import cho_factor, cho_solve

[docs] COMPRESSION_FACTOR = 0.5 # factor used to reduce numerical instability
[docs] def get_invalid_disp_mask(row_map: NDArray, col_map: NDArray, invalid_disp: Union[int, float]) -> NDArray: """ Compute mask for points equal to invalid_disp in row_map or col_map. This mask is then used to remove invalid points from position vectors. :param row_map: row disparity map :param col_map: col disparity map :param invalid_disp: invalid disparity value :return: invalid mask for position matrix """ if np.isnan(invalid_disp): mask_invalid = np.isnan(row_map) | np.isnan(col_map) elif np.isinf(invalid_disp): mask_invalid = np.isinf(row_map) | np.isinf(col_map) else: mask_invalid = (row_map == invalid_disp) | (col_map == invalid_disp) return mask_invalid
[docs] def add_mesh_validity_band(dataset_disp_maps: xr.Dataset) -> xr.Dataset: """ Add a 'MESH_validity' band to an existing validity dataset. :param dataset_disp_maps: xarray Dataset containing the validity data and the disparity maps """ # Criteria list with new criteria for MESH_validity new_criteria = list(dataset_disp_maps.coords["criteria"].values) + ["MESH_validity"] # Mesh validity band initialized to 0 dataset_validity_extended = dataset_disp_maps.reindex(criteria=new_criteria, fill_value=0) dataset_disp_maps = dataset_validity_extended return dataset_disp_maps
[docs] def make_position_vectors(dataset_disp_maps: xr.Dataset) -> Tuple[NDArray, NDArray, NDArray, NDArray]: """ Construct initial et final positions vectors using dataset_disp_maps coordinates and disparity maps. To reduce numerical instability, a compression factor is used to create the final and initial position grids. For the least squares problem y=Xb: - X is constructed using init_row_coords and init_col_coords - y is either final_row_coords or final_col_coords. :param dataset_disp_maps: disparity maps dataset :return: initial position vectors and final position vectors """ # dataset_disp_maps xarray coordinates are used to get initial positions # for each point of the image col_coords = dataset_disp_maps["col"].values * COMPRESSION_FACTOR row_coords = dataset_disp_maps["row"].values * COMPRESSION_FACTOR init_col_2d_coords, init_row_2d_coords = np.meshgrid(col_coords, row_coords) # final positions are computed using initial positions # to which we add the disparities calculated by pandora2d final_col_2d_coords = init_col_2d_coords + dataset_disp_maps["col_map"].data * COMPRESSION_FACTOR final_row_2d_coords = init_row_2d_coords + dataset_disp_maps["row_map"].data * COMPRESSION_FACTOR # We remove the points that are invalid_disp either in row_map or in col_map # so as not to distort the calculation of the least squares coefficients mask_invalid = get_invalid_disp_mask( dataset_disp_maps["row_map"].data, dataset_disp_maps["col_map"].data, dataset_disp_maps.attrs["invalid_disp"] ) # Fill mesh validity band in validity dataset according to the number of valid points in the mesh nb_valid = (mask_invalid == 0).sum() value = 0 if nb_valid >= dataset_disp_maps.attrs["minimal_nb_pixels_per_mesh"] else 1 dataset_disp_maps["validity"].sel(criteria="MESH_validity")[:, :] = value init_row_coords = init_row_2d_coords[~mask_invalid] init_col_coords = init_col_2d_coords[~mask_invalid] final_row_coords = final_row_2d_coords[~mask_invalid] final_col_coords = final_col_2d_coords[~mask_invalid] return init_row_coords, init_col_coords, final_row_coords, final_col_coords
[docs] def make_polynomial_design_matrix(row_init_coords: NDArray, col_init_coords: NDArray, degree: int) -> NDArray: """ Construct a 2D polynomial design matrix up to a given degree. It generates all polynomial combination of the input variables col_init_coords and row_init_coords. For the least squares problem y=Xb, X is the design matrix. :param col_init_coords: initial column positions :param row_init_coords: initial row positions :param degree: polynomial degree :return: polynomial design matrix """ exponent_pairs = [(a, b) for a in range(degree + 1) for b in range(degree + 1 - a)] design_matrix = np.column_stack([col_init_coords**b * row_init_coords**a for (a, b) in exponent_pairs]) return design_matrix
[docs] def check_nb_observations(design_matrix: NDArray): """ Check if we have more parameters than observations, in which case a ValueError is raised. :param design_matrix: Design matrix """ if design_matrix.shape[0] < design_matrix.shape[1]: raise ValueError( "To solve the least squares problem, there must be more observations than parameters. " "Please reduce the degree of the polynomial or increase the number of observations." )
[docs] def estimate_model(dataset_disp_maps: xr.Dataset, degree: int) -> Tuple[NDArray, NDArray, NDArray, NDArray]: """ Estimate deformation model from initial positions to final positions by resolving y=Xb. :param degree: polynomial degree :param dataset_disp_maps: disparity maps dataset :return: least square solution and sum of residuals for rows and columns """ # Get initial and final position vectors row_init_coords, col_init_coords, row_final_coords, col_final_coords = make_position_vectors(dataset_disp_maps) # Get design matrix X design_matrix = make_polynomial_design_matrix(row_init_coords, col_init_coords, degree) # Check that we have enough observations compared to the number of parameters check_nb_observations(design_matrix) # Resolve least squares problem y = Xb coefficients_row, sum_sq_residuals_row, _, __ = np.linalg.lstsq(design_matrix, row_final_coords.ravel()) coefficients_col, sum_sq_residuals_col, _, __ = np.linalg.lstsq(design_matrix, col_final_coords.ravel()) return coefficients_row, coefficients_col, sum_sq_residuals_row, sum_sq_residuals_col
[docs] def estimate_model_cholesky( dataset_disp_maps: xr.Dataset, degree: int, lambda_ridge: Union[int, float, None] = None ) -> Tuple[NDArray, NDArray, NDArray, NDArray]: """ Estimate deformation model from initial positions to final positions by resolving y=Xb using Cholesky decomposition. We have X.T*X*b = X.T*y and X.T*X = L*L.T with L a lower triangular matrix. Then, using Cholesky decomposition we can resolve L*z=X.T*y and L.T*b = z. If a value is specified for lamba_ridge, Ridge regularization is used. :param dataset_disp_maps: disparity maps dataset :param degree: polynomial degree :param lambda_ridge: Ridge regularization factor :return: least square solution and sum of residuals for rows and columns """ # Get initial and final position vectors row_init_coords, col_init_coords, row_final_coords, col_final_coords = make_position_vectors(dataset_disp_maps) # Get design matrix X design_matrix = make_polynomial_design_matrix(row_init_coords, col_init_coords, degree) # Check that we have enough observations compared to the number of parameters check_nb_observations(design_matrix) # Build the normal matrix (X.T*X) and right-hand sides (X.T*y) x_transpose_x = np.dot(design_matrix.T, design_matrix).astype(float) x_transpose_y_row = np.dot(design_matrix.T, row_final_coords.ravel()) x_transpose_y_col = np.dot(design_matrix.T, col_final_coords.ravel()) # Add ridge penalty if the lambda_ridge parameter is specified # then compute Cholesky matrix L such as X.T*X = L*L.T if lambda_ridge is not None: x_transpose_x += lambda_ridge * np.eye(design_matrix.shape[1]) # Compute the Cholesky factorization (L is lower triangular) c, low = cho_factor(x_transpose_x, lower=True) # Resolve Cholesky system coefficients_row = cho_solve((c, low), x_transpose_y_row) coefficients_col = cho_solve((c, low), x_transpose_y_col) # Compute sum of squared residuals sum_sq_residuals_row = np.sum((row_final_coords.ravel() - np.dot(design_matrix, coefficients_row)) ** 2) sum_sq_residuals_col = np.sum((col_final_coords.ravel() - np.dot(design_matrix, coefficients_col)) ** 2) return coefficients_row, coefficients_col, sum_sq_residuals_row, sum_sq_residuals_col
[docs] def estimate_init_disparity_grids( dataset_disp_maps: xr.Dataset, coefficients_row: NDArray, coefficients_col: NDArray, degree: int, next_resolution_shape: Tuple, ) -> Tuple[NDArray, NDArray]: """ Estimate initial disparity grids (rows and columns) according to scale factor and coefficients of least squares resolution. :param dataset_disp_maps: disparity maps dataset :param coefficients_row: row coefficients computed by least squares resolution :param coefficients_col: col coefficients computed by least squares resolution :param degree: polynomial degree :param next_resolution_shape: shape of image for next resolution :return: initial disparity grids for rows and columns """ # Get resampled coordinates according to scale factor # To reduce numerical instability, a compression factor is used to create resampled coordinates scaled_row = ( np.linspace( dataset_disp_maps.coords["row"].values[0], dataset_disp_maps.coords["row"].values[-1] + 1, next_resolution_shape[0], endpoint=False, ) * COMPRESSION_FACTOR ) scaled_col = ( np.linspace( dataset_disp_maps.coords["col"].values[0], dataset_disp_maps.coords["col"].values[-1] + 1, next_resolution_shape[1], endpoint=False, ) * COMPRESSION_FACTOR ) # We add 0.5 to scaled_col and scaled_row because we want to consider the center of the pixels # and not their upper left corner. scaled_row += 0.5 * COMPRESSION_FACTOR scaled_col += 0.5 * COMPRESSION_FACTOR # Get initial positions for resampled coordinates scaled_col_2d, scaled_row_2d = np.meshgrid(scaled_col, scaled_row) # Get design matrix for resampled initial positions design_matrix = make_polynomial_design_matrix(scaled_row_2d.ravel(), scaled_col_2d.ravel(), degree) # Compute the final position grids estimated using least squares coefficients estimated_final_row = np.dot(design_matrix, coefficients_row) estimated_final_col = np.dot(design_matrix, coefficients_col) # Reshape estimated final disparity grids estimated_final_row_grid = estimated_final_row.reshape(scaled_row_2d.shape) estimated_final_col_grid = estimated_final_col.reshape(scaled_col_2d.shape) # Compute estimated initial disparity grid for next resolution by subtracting the resampled initial position, # multiplying by the scale factor and dividing by the compression factor scale_factor_row = np.round(next_resolution_shape[0] / len(dataset_disp_maps.coords["row"].values)) scale_factor_col = np.round(next_resolution_shape[1] / len(dataset_disp_maps.coords["col"].values)) estimated_init_row_grid = np.round( (estimated_final_row_grid - scaled_row_2d) * scale_factor_row / COMPRESSION_FACTOR ) estimated_init_col_grid = np.round( (estimated_final_col_grid - scaled_col_2d) * scale_factor_col / COMPRESSION_FACTOR ) return estimated_init_row_grid, estimated_init_col_grid
[docs] def get_next_shape_mesh_list(next_resolution_shape: int, nb_mesh: int) -> List[int]: """ Return list of shape for mesh for next resolution. This list is then used to estimate initial disparity grid at next resolution with the right shape for each mesh. :param next_resolution_shape: next resolution shape (row or column) :param nb_mesh: number of mesh (row or column) :return: List of shape by mesh (row or column) """ condition_add_one_pixel = nb_mesh - (next_resolution_shape % nb_mesh) next_shape_list = [ next_resolution_shape // nb_mesh + (1 if mesh >= condition_add_one_pixel else 0) for mesh in range(nb_mesh) ] return next_shape_list
[docs] def concatenate_estimated_grids(estimated_init_grid_list: List[NDArray], nb_row_mesh: int, nb_col_mesh: int) -> NDArray: """ Concatenate the estimated disparity grids computed by mesh and return the full initial disparity grid for next resolution. :param estimated_init_grid_list: :param nb_row_mesh: number of mesh for rows :param nb_col_mesh: number of mesh for columns :return: full estimated initial disparity grid for next resolution """ estimated_init_grid = np.concatenate( [ np.concatenate(estimated_init_grid_list[mesh_row * nb_col_mesh : (mesh_row + 1) * nb_col_mesh], axis=1) for mesh_row in range(nb_row_mesh) ], axis=0, ) return estimated_init_grid
[docs] def get_init_disparity_grids( dataset_disp_maps: xr.Dataset, multiscale_cfg: Dict, next_resolution_shape: Tuple ) -> Tuple[NDArray, NDArray, NDArray, NDArray]: """ Return initial disparity grid after computing least square coefficients :param dataset_disp_maps: disparity maps dataset :param multiscale_cfg: multiscale pipeline configuration :param next_resolution_shape: shape of image for next resolution :return: initial disparity grids for rows and columns and sum of squared residuals """ # Estimate model coefficients_row, coefficients_col, sum_sq_residuals_row, sum_sq_residuals_col = estimate_model_cholesky( dataset_disp_maps, multiscale_cfg["model"]["degree"], lambda_ridge=1.0e-6 ) # Estimate initial disparity grids for next resolution estimated_init_row_grid, estimated_init_col_grid = estimate_init_disparity_grids( dataset_disp_maps, coefficients_row, coefficients_col, multiscale_cfg["model"]["degree"], next_resolution_shape, ) return estimated_init_row_grid, estimated_init_col_grid, sum_sq_residuals_row, sum_sq_residuals_col
[docs] def get_init_disparity_grids_with_mesh( dataset_disp_maps: xr.Dataset, multiscale_cfg: Dict, next_resolution_shape: Tuple ) -> Tuple[NDArray, NDArray, NDArray, NDArray, xr.Dataset]: """ Return initial disparity grid after computing least square coefficients for each mesh :param dataset_disp_maps: disparity maps dataset :param multiscale_cfg: multiscale pipeline configuration :param next_resolution_shape: shape of image for next resolution :return: initial disparity grids for rows and columns and RMSE for row and columns """ # Add mesh validity band to dataset_disp_maps dataset_disp_maps = add_mesh_validity_band(dataset_disp_maps) # Tile size used to divide the disparty grid nb_row_mesh = multiscale_cfg["mesh"]["row"] nb_col_mesh = multiscale_cfg["mesh"]["col"] # Disparity map shape rows, cols = dataset_disp_maps["row_map"].data.shape # Euclidean division: integer part (quotient) rows_in_mesh = rows // nb_row_mesh cols_in_mesh = cols // nb_col_mesh estimated_init_row_grid_list = [] estimated_init_col_grid_list = [] # Get shape for next resolution for each mesh rows_next_shape_list = get_next_shape_mesh_list(next_resolution_shape[0], nb_row_mesh) cols_next_shape_list = get_next_shape_mesh_list(next_resolution_shape[1], nb_col_mesh) sum_sq_residuals_row_total = 0.0 sum_sq_residuals_col_total = 0.0 for mesh_row in range(nb_row_mesh): for mesh_col in range(nb_col_mesh): row_start = mesh_row * rows_in_mesh col_start = mesh_col * cols_in_mesh row_end = rows if mesh_row == nb_row_mesh - 1 else (mesh_row + 1) * rows_in_mesh col_end = cols if mesh_col == nb_col_mesh - 1 else (mesh_col + 1) * cols_in_mesh # Select sub dataset corresponding to each mesh sub_dataset = dataset_disp_maps.isel(row=slice(row_start, row_end), col=slice(col_start, col_end)) # Compute initial disparity grid by mesh sub_estimated_init_row_grid, sub_estimated_init_col_grid, sum_sq_residuals_row, sum_sq_residuals_col = ( get_init_disparity_grids( sub_dataset, multiscale_cfg, (rows_next_shape_list[mesh_row], cols_next_shape_list[mesh_col]) ) ) estimated_init_row_grid_list.append(sub_estimated_init_row_grid) estimated_init_col_grid_list.append(sub_estimated_init_col_grid) # Cast in float to avoir mypy error sum_sq_residuals_row_total += float(sum_sq_residuals_row) sum_sq_residuals_col_total += float(sum_sq_residuals_col) # Concatenate mesh to get full initial disparity grids estimated_init_row_grid = concatenate_estimated_grids(estimated_init_row_grid_list, nb_row_mesh, nb_col_mesh) estimated_init_col_grid = concatenate_estimated_grids(estimated_init_col_grid_list, nb_row_mesh, nb_col_mesh) # Compute RMSE rmse_row = np.sqrt(sum_sq_residuals_row_total / (dataset_disp_maps["row_map"].size)) rmse_col = np.sqrt(sum_sq_residuals_col_total / (dataset_disp_maps["row_map"].size)) return estimated_init_row_grid, estimated_init_col_grid, rmse_row, rmse_col, dataset_disp_maps