Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RKHS inner product #550

Open
wants to merge 13 commits into
base: develop
Choose a base branch
from
2 changes: 2 additions & 0 deletions skfda/misc/covariances.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,8 @@ def __call__(self, x: ArrayLike, y: ArrayLike) -> NDArrayFloat:
Returns:
Covariance function evaluated at the grid formed by x and y.
"""
x = _transform_to_2d(x)
y = _transform_to_2d(y)
m5signorini marked this conversation as resolved.
Show resolved Hide resolved
return self.cov_fdata([x, y], grid=True)[0, ..., 0]


Expand Down
216 changes: 216 additions & 0 deletions skfda/misc/rkhs_product.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
"""RKHS inner product of two FData objects."""
from __future__ import annotations

from typing import Callable, Optional

import multimethod
import numpy as np
import scipy.linalg

from ..misc.covariances import EmpiricalBasis
from ..representation import FData, FDataBasis, FDataGrid
from ..representation.basis import Basis, TensorBasis
from ..typing._numpy import NDArrayFloat
from ._math import inner_product
from .validation import check_fdata_dimensions


def _check_compatible_fdata(
m5signorini marked this conversation as resolved.
Show resolved Hide resolved
fdata1: FData,
fdata2: FData,
) -> None:
"""Check terms of the inner product are compatible.

Args:
fdata1: First functional data object.
fdata2: Second functional data object.

Raises:
ValueError: If the data is not univariate or if the number of samples
is not the same.
"""
check_fdata_dimensions(fdata1, dim_domain=1, dim_codomain=1)
check_fdata_dimensions(fdata2, dim_domain=1, dim_codomain=1)
if fdata1.n_samples != fdata2.n_samples:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this prevent that one of the FData has just one sample, for broadcasting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I forgot considering broadcasting. I'll include it with the rest of changes and suggestions; working on it.

raise ValueError(
"Invalid number of samples for functional data objects:"
f"{fdata1.n_samples} != {fdata2.n_samples}.",
)


def _get_coeff_matrix(
cov_function: Callable[[NDArrayFloat, NDArrayFloat], NDArrayFloat],
basis1: Basis,
basis2: Basis,
) -> NDArrayFloat:
"""Return the matrix of coefficients of a function in a tensor basis.

Args:
cov_function: Covariance function as a callable. It is expected to
receive two arrays, s and t, and return the corresponding
covariance matrix.
basis1: First basis.
basis2: Second basis.

Returns:
Matrix of coefficients of the covariance function in the tensor
basis formed by the two given bases.
"""
# In order to use inner_product, the callable must follow the
# same convention as the evaluation on FDataGrids, that is, it
# is expected to receive a single bidimensional point as input
def cov_function_pointwise( # noqa: WPS430
x: NDArrayFloat,
) -> NDArrayFloat:
t = np.array([x[0]])
s = np.array([x[1]])
return cov_function(t, s)[..., np.newaxis]

tensor_basis = TensorBasis([basis1, basis2])

# Solving the system yields the coefficients of the covariance
# as a vector that is reshaped to form a matrix
return np.linalg.solve(
tensor_basis.gram_matrix(),
inner_product(
cov_function_pointwise,
tensor_basis,
_domain_range=tensor_basis.domain_range,
).T,
).reshape(basis1.n_basis, basis2.n_basis)


@multimethod.multidispatch
def rkhs_inner_product(
m5signorini marked this conversation as resolved.
Show resolved Hide resolved
fdata1: FDataGrid,
fdata2: FDataGrid,
*,
cov_function: Callable[[NDArrayFloat, NDArrayFloat], NDArrayFloat],
cond: Optional[float] = None,
) -> NDArrayFloat:
r"""RKHS inner product of two univariate FDataGrid objects.

This is the most general method for calculating the RKHS inner product
using a discretization of the domain. The RKHS inner product is calculated
as the inner product between the square root inverse of the covariance
operator applied to the functions.
Assuming the existence of such inverse, using the self-adjointness of the
covariance operator, the RKHS inner product can be calculated as:
\langle f, \mathcal{K}^{-1} g \rangle
When discretizing a common domain, this is equivalent to doing the matrix
product between the discretized functions and the inverse of the
covariance matrix.
In case of having different discretization grids, the left inverse of the
transposed covariace matrix is used instead.

Args:
fdata1: First FDataGrid object.
fdata2: Second FDataGrid object.
cov_function: Covariance function as a callable. It is expected to
receive two arrays, s and t, and return the corresponding
covariance matrix.
cond: Cutoff for small singular values of the covariance matrix.
Default uses scipy default.

Returns:
Matrix of the RKHS inner product between paired samples of
fdata1 and fdata2.
"""
_check_compatible_fdata(fdata1, fdata2)

data_matrix_1 = fdata1.data_matrix
data_matrix_2 = fdata2.data_matrix
grid_points_1 = fdata1.grid_points[0]
grid_points_2 = fdata2.grid_points[0]
cov_matrix_1_2 = cov_function(grid_points_1, grid_points_2)

# Calculate the inverse operator applied to fdata2
if np.array_equal(grid_points_1, grid_points_2):
inv_fd2_matrix = np.linalg.solve(
cov_matrix_1_2,
data_matrix_2,
)
else:
inv_fd2_matrix = np.asarray(
vnmabus marked this conversation as resolved.
Show resolved Hide resolved
scipy.linalg.lstsq(
cov_matrix_1_2.T,
data_matrix_2[..., 0].T,
cond=cond,
)[0],
vnmabus marked this conversation as resolved.
Show resolved Hide resolved
).T[..., np.newaxis]
vnmabus marked this conversation as resolved.
Show resolved Hide resolved

products = (
np.transpose(data_matrix_1, axes=(0, 2, 1))
@ inv_fd2_matrix
)
# Remove redundant dimensions
return products.reshape(products.shape[0])


@rkhs_inner_product.register(FDataBasis, FDataBasis)
def rkhs_inner_product_fdatabasis(
fdata1: FDataBasis,
fdata2: FDataBasis,
*,
cov_function: Callable[[NDArrayFloat, NDArrayFloat], NDArrayFloat],
) -> NDArrayFloat:
"""RKHS inner product of two FDataBasis objects.

In case of using a basis expression, the RKHS inner product can be
computed obtaining first a basis representation of the covariance
function in the tensor basis. Then, the inverse operator applied to
the second term is calculated.
In case of using a common basis, that step is done by solving the
system given by the inverse of the matrix of coefficients of the
covariance function in the tensor basis formed by the two given bases.
In case of using different bases, the left inverse of the transposed
matrix of coefficients of the covariance function is used instead.
Finally, the inner product between each pair of samples is calculated.

In case of knowing the matrix of coefficients of the covariance function
in the tensor basis formed by the two given bases, it can be passed as
an argument to avoid the calculation of it.

Args:
fdata1: First FDataBasis object.
fdata2: Second FDataBasis object.
cov_function: Covariance function as a callable. It is expected to
receive two arrays, s and t, and return the corresponding
covariance matrix.

Returns:
Matrix of the RKHS inner product between paired samples of
fdata1 and fdata2.
"""
_check_compatible_fdata(fdata1, fdata2)

if isinstance(cov_function, EmpiricalBasis):
cov_coeff_matrix = cov_function.coeff_matrix
else:
# Express the covariance function in the tensor basis
# NOTE: The alternative is to convert to FDatagrid the two FDataBasis
cov_coeff_matrix = _get_coeff_matrix(
cov_function,
fdata1.basis,
fdata2.basis,
)

if fdata1.basis == fdata2.basis:
inv_fd2_coefs = np.linalg.solve(
cov_coeff_matrix,
fdata2.coefficients.T,
)
else:
inv_fd2_coefs = np.linalg.lstsq(
cov_coeff_matrix.T,
fdata2.coefficients.T,
rcond=None,
)[0]

# Following einsum is equivalent to doing the matrix multiplication
# and then taking the diagonal of the result
return np.einsum( # type: ignore[no-any-return]
vnmabus marked this conversation as resolved.
Show resolved Hide resolved
"ij,ji->i",
fdata1.coefficients,
inv_fd2_coefs,
)
Loading