diff --git a/pymc/gp/__init__.py b/pymc/gp/__init__.py index 9510f33de4..995a5f9b87 100644 --- a/pymc/gp/__init__.py +++ b/pymc/gp/__init__.py @@ -22,3 +22,4 @@ MarginalKron, MarginalSparse, ) +from pymc.gp.hsgp import HSGP diff --git a/pymc/gp/cov.py b/pymc/gp/cov.py index 2894e7cd9e..fa6dfbe1fd 100644 --- a/pymc/gp/cov.py +++ b/pymc/gp/cov.py @@ -14,6 +14,7 @@ import warnings +from collections import Counter from functools import reduce from numbers import Number from operator import add, mul @@ -46,26 +47,21 @@ ] -class Covariance: - r""" - Base class for all kernels/covariance functions. +def _verify_scalar(value): + if isinstance(value, aesara.compile.SharedVariable) and value.get_value().squeeze().shape == (): + return at.squeeze(value) + elif np.asarray(value).squeeze().shape == (): + return np.squeeze(value) + elif isinstance(value, Number): + return value - Parameters - ---------- - input_dim: integer - The number of input dimensions, or columns of X (or Xs) - the kernel will operate on. - active_dims: List of integers - Indicate which dimension or column of X the covariance - function operates on. - """ + raise ValueError("A scalar value is required.") - def __init__(self, input_dim, active_dims=None): - self.input_dim = input_dim - if active_dims is None: - self.active_dims = np.arange(input_dim) - else: - self.active_dims = np.asarray(active_dims, int) + +class BaseCovariance: + """ + Base class for kernels/covariance functions. + """ def __call__(self, X, Xs=None, diag=False): r""" @@ -88,27 +84,18 @@ def __call__(self, X, Xs=None, diag=False): def diag(self, X): raise NotImplementedError - def full(self, X, Xs): + def full(self, X, Xs=None): raise NotImplementedError - def _slice(self, X, Xs): - xdims = X.shape[-1] - if isinstance(xdims, Variable): - xdims = xdims.eval() - if self.input_dim != xdims: - warnings.warn( - f"Only {self.input_dim} column(s) out of {xdims} are" - " being used to compute the covariance function. If this" - " is not intended, increase 'input_dim' parameter to" - " the number of columns to use. Ignore otherwise.", - UserWarning, - ) - X = at.as_tensor_variable(X[:, self.active_dims]) - if Xs is not None: - Xs = at.as_tensor_variable(Xs[:, self.active_dims]) - return X, Xs - def __add__(self, other): + # If it's a scalar, cast as Constant covariance. This allows validation for power spectral + # density calc. + if isinstance(other, Number): + try: + other = Constant(c=_verify_scalar(other)) + except ValueError: + pass + return Add([self, other]) def __mul__(self, other): @@ -121,19 +108,7 @@ def __rmul__(self, other): return self.__mul__(other) def __pow__(self, other): - if ( - isinstance(other, aesara.compile.SharedVariable) - and other.get_value().squeeze().shape == () - ): - other = at.squeeze(other) - return Exponentiated(self, other) - elif isinstance(other, Number): - return Exponentiated(self, other) - elif np.asarray(other).squeeze().shape == (): - other = np.squeeze(other) - return Exponentiated(self, other) - - raise ValueError("A covariance function can only be exponentiated by a scalar value") + return Exponentiated(self, _verify_scalar(other)) def __array_wrap__(self, result): """ @@ -150,41 +125,127 @@ def __array_wrap__(self, result): A = np.zeros((r, c)) for i in range(r): for j in range(c): - A[i, j] = result[i, j].factor_list[1] + r = result[i, j]._factor_list[1] + if isinstance(r, Constant): + # Counteract the elemwise Add edgecase + r = r.c + A[i, j] = r if isinstance(result[0][0], Add): - return result[0][0].factor_list[0] + A + return result[0][0]._factor_list[0] + A elif isinstance(result[0][0], Prod): - return result[0][0].factor_list[0] * A + return result[0][0]._factor_list[0] * A else: raise TypeError( - f"Unknown Covariance combination type {result[0][0]}. Known types are `Add` or `Prod`." + f"Unknown Covariance combination type {result[0][0]}. " + "Known types are `Add` or `Prod`." ) +class Covariance(BaseCovariance): + """ + Base class for kernels/covariance functions with input_dim and active_dims, which excludes + kernels like `Constant` and `WhiteNoise`. + + Parameters + ---------- + input_dim: integer + The number of input dimensions, or columns of X (or Xs) + the kernel will operate on. + active_dims: List of integers + Indicate which dimension or column of X the covariance + function operates on. + """ + + def __init__(self, input_dim, active_dims=None): + self.input_dim = input_dim + if active_dims is None: + self.active_dims = np.arange(input_dim) + else: + self.active_dims = np.asarray(active_dims, int) + + if max(self.active_dims) > self.input_dim: + raise ValueError("Values in `active_dims` can't be larger than `input_dim`.") + + @property + def D(self): + """The dimensionality of the input, as taken from the + `active_dims`. + """ + # Evaluate lazily in-case this changes. + return len(self.active_dims) + + def _slice(self, X, Xs=None): + xdims = X.shape[-1] + if isinstance(xdims, Variable): + xdims = xdims.eval() + if self.input_dim != xdims: + warnings.warn( + f"Only {self.input_dim} column(s) out of {xdims} are" + " being used to compute the covariance function. If this" + " is not intended, increase 'input_dim' parameter to" + " the number of columns to use. Ignore otherwise.", + UserWarning, + ) + X = at.as_tensor_variable(X[:, self.active_dims]) + if Xs is not None: + Xs = at.as_tensor_variable(Xs[:, self.active_dims]) + return X, Xs + + class Combination(Covariance): def __init__(self, factor_list): - input_dim = max( - factor.input_dim for factor in factor_list if isinstance(factor, Covariance) + """Use constituent factors to get input_dim and active_dims for the Combination covariance.""" + + # Check if all input_dim are the same in factor_list + input_dims = {factor.input_dim for factor in factor_list if isinstance(factor, Covariance)} + + if len(input_dims) != 1: + raise ValueError("All covariances must have the same `input_dim`.") + input_dim = input_dims.pop() + + # Union all active_dims sets in factor_list for the combination covariance + active_dims = np.sort( + np.asarray( + list( + set.union( + *[ + set(factor.active_dims) + for factor in factor_list + if isinstance(factor, Covariance) + ] + ) + ), + dtype=int, + ) ) - super().__init__(input_dim=input_dim) - self.factor_list = [] + + super().__init__(input_dim=input_dim, active_dims=active_dims) + + # Set up combination kernel, flatten out factor_list so that + self._factor_list = [] for factor in factor_list: if isinstance(factor, self.__class__): - self.factor_list.extend(factor.factor_list) + self._factor_list.extend(factor._factor_list) else: - self.factor_list.append(factor) + self._factor_list.append(factor) - def merge_factors(self, X, Xs=None, diag=False): + def _merge_factors_cov(self, X, Xs=None, diag=False): + """Called to evaluate either all the sums or all the + products of kernels that are possible to evaluate. + """ factor_list = [] - for factor in self.factor_list: + for factor in self._factor_list: + # make sure diag=True is handled properly - if isinstance(factor, Covariance): + if isinstance(factor, BaseCovariance): factor_list.append(factor(X, Xs, diag)) + elif isinstance(factor, np.ndarray): if np.ndim(factor) == 2 and diag: factor_list.append(np.diag(factor)) else: factor_list.append(factor) + elif isinstance( factor, ( @@ -193,23 +254,79 @@ def merge_factors(self, X, Xs=None, diag=False): TensorSharedVariable, ), ): + if factor.ndim == 2 and diag: factor_list.append(at.diag(factor)) else: factor_list.append(factor) + else: factor_list.append(factor) + + return factor_list + + def _merge_factors_psd(self, omega): + """Called to evaluatate spectral densities of combination kernels when possible. Implements + a more restricted set of rules than `_merge_factors_cov` -- just additivity of stationary + covariances with defined power spectral densities and multiplication by scalars. Also, the + active_dims for all covariances in the sum must be the same. + """ + factor_list = [] + for factor in self._factor_list: + if isinstance(factor, Covariance): + + # Allow merging covariances for psd only if active_dims are the same + if set(self.active_dims) != set(factor.active_dims): + raise ValueError( + "For power spectral density calculations `active_dims` must be the same " + "for all covariances in the sum." + ) + + # If it's a covariance try to calculate the psd + try: + factor_list.append(factor.psd(omega)) + + except (AttributeError, NotImplementedError) as e: + + if isinstance(factor, Stationary): + raise NotImplementedError( + f"No power spectral density method has been implemented for {factor}." + ) from e + + else: + raise ValueError( + "Power spectral densities, `.psd(omega)`, can only be calculated for " + f"`Stationary` covariance functions. {factor} is non-stationary." + ) from e + + else: + # Otherwise defer the reduction to later + factor_list.append(factor) + return factor_list class Add(Combination): def __call__(self, X, Xs=None, diag=False): - return reduce(add, self.merge_factors(X, Xs, diag)) + return reduce(add, self._merge_factors_cov(X, Xs, diag)) + + def psd(self, omega): + return reduce(add, self._merge_factors_psd(omega)) class Prod(Combination): def __call__(self, X, Xs=None, diag=False): - return reduce(mul, self.merge_factors(X, Xs, diag)) + return reduce(mul, self._merge_factors_cov(X, Xs, diag)) + + def psd(self, omega): + check = Counter([isinstance(factor, Covariance) for factor in self._factor_list]) + if check.get(True) >= 2: + raise NotImplementedError( + "The power spectral density of products of covariance " + "functions is not implemented." + ) + + return reduce(mul, self._merge_factors_psd(omega)) class Exponentiated(Covariance): @@ -242,7 +359,7 @@ def __init__(self, factor_list): self.input_dims = [factor.input_dim for factor in factor_list] input_dim = sum(self.input_dims) super().__init__(input_dim=input_dim) - self.factor_list = factor_list + self._factor_list = factor_list def _split(self, X, Xs): indices = np.cumsum(self.input_dims) @@ -255,11 +372,11 @@ def _split(self, X, Xs): def __call__(self, X, Xs=None, diag=False): X_split, Xs_split = self._split(X, Xs) - covs = [cov(x, xs, diag) for cov, x, xs in zip(self.factor_list, X_split, Xs_split)] + covs = [cov(x, xs, diag) for cov, x, xs in zip(self._factor_list, X_split, Xs_split)] return reduce(mul, covs) -class Constant(Covariance): +class Constant(BaseCovariance): r""" Constant valued covariance function. @@ -269,7 +386,6 @@ class Constant(Covariance): """ def __init__(self, c): - super().__init__(1, None) self.c = c def diag(self, X): @@ -292,7 +408,6 @@ class WhiteNoise(Covariance): """ def __init__(self, sigma): - super().__init__(1, None) self.sigma = sigma def diag(self, X): @@ -407,6 +522,9 @@ def diag(self, X): def full(self, X, Xs=None): raise NotImplementedError + def psd(self, omega): + raise NotImplementedError + class Periodic(Stationary): r""" @@ -450,12 +568,26 @@ class ExpQuad(Stationary): .. math:: k(x, x') = \mathrm{exp}\left[ -\frac{(x - x')^2}{2 \ell^2} \right] + + The power spectral density (`psd`) is: + + .. math:: + + S(\boldsymbol\omega) = + (\sqrt(2 \pi)^D \prod_{i}^{D}\ell_i + \exp\left( -\frac{1}{2} \sum_{i}^{D}\ell_i^2 \omega_i^{2} \right) """ def full(self, X, Xs=None): X, Xs = self._slice(X, Xs) return at.exp(-0.5 * self.square_dist(X, Xs)) + def psd(self, omega): + ls = at.ones(self.D) * self.ls + c = at.power(at.sqrt(2.0 * np.pi), self.D) + exp = at.exp(-0.5 * at.dot(at.square(omega), at.square(ls))) + return c * at.prod(ls) * exp + class RatQuad(Stationary): r""" @@ -487,6 +619,16 @@ class Matern52(Stationary): k(x, x') = \left(1 + \frac{\sqrt{5(x - x')^2}}{\ell} + \frac{5(x-x')^2}{3\ell^2}\right) \mathrm{exp}\left[ - \frac{\sqrt{5(x - x')^2}}{\ell} \right] + + The power spectral density is: + + .. math:: + + S(\boldsymbol\omega) = + \frac{2^D \pi^{\frac{D}{2}} \Gamma(\frac{D+5}{2}) 5^{5/2}} + {\frac{3}{4}\sqrt{\pi}} + \prod_{i=1}^{D}\ell_{i} + \left(5 + \sum_{i=1}^{D}\ell_{i}^2 \boldsymbol\omega_{i}^{2}\right)^{-\frac{D+5}{2}} """ def full(self, X, Xs=None): @@ -494,6 +636,14 @@ def full(self, X, Xs=None): r = self.euclidean_dist(X, Xs) return (1.0 + np.sqrt(5.0) * r + 5.0 / 3.0 * at.square(r)) * at.exp(-1.0 * np.sqrt(5.0) * r) + def psd(self, omega): + ls = at.ones(self.D) * self.ls + D52 = (self.D + 5) / 2 + num = at.power(2, self.D) * at.power(np.pi, self.D / 2) * at.gamma(D52) * at.power(5, 5 / 2) + den = 0.75 * at.sqrt(np.pi) + pow = at.power(5.0 + at.dot(at.square(omega), at.square(ls)), -1 * D52) + return (num / den) * at.prod(ls) * pow + class Matern32(Stationary): r""" @@ -503,6 +653,16 @@ class Matern32(Stationary): k(x, x') = \left(1 + \frac{\sqrt{3(x - x')^2}}{\ell}\right) \mathrm{exp}\left[ - \frac{\sqrt{3(x - x')^2}}{\ell} \right] + + The power spectral density is: + + .. math:: + + S(\boldsymbol\omega) = + \frac{2^D \pi^{D/2} \Gamma\left(\frac{D+3}{2}\right) 3^{3/2}} + {\frac{1}{2}\sqrt{\pi}} + \prod_{i=1}^{D}\ell_{i} + \left(3 + \sum_{i=1}^{D}\ell_{i}^2 \boldsymbol\omega_{i}^{2}\right)^{-\frac{D+3}{2}} """ def full(self, X, Xs=None): @@ -510,6 +670,14 @@ def full(self, X, Xs=None): r = self.euclidean_dist(X, Xs) return (1.0 + np.sqrt(3.0) * r) * at.exp(-np.sqrt(3.0) * r) + def psd(self, omega): + ls = at.ones(self.D) * self.ls + D32 = (self.D + 3) / 2 + num = at.power(2, self.D) * at.power(np.pi, self.D / 2) * at.gamma(D32) * at.power(3, 3 / 2) + den = 0.5 * at.sqrt(np.pi) + pow = at.power(3.0 + at.dot(at.square(omega), at.square(ls)), -1 * D32) + return (num / den) * at.prod(ls) * pow + class Matern12(Stationary): r""" diff --git a/pymc/gp/hsgp.py b/pymc/gp/hsgp.py new file mode 100644 index 0000000000..ba7146f781 --- /dev/null +++ b/pymc/gp/hsgp.py @@ -0,0 +1,229 @@ +# Copyright 2022 The PyMC Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import warnings + +from typing import Optional + +import aesara.tensor as at +import numpy as np + +import pymc as pm + +from pymc.gp.cov import Covariance +from pymc.gp.gp import Base +from pymc.gp.mean import Mean, Zero + + +class HSGP(Base): + R""" + Hilbert Space Gaussian process + + The `gp.HSGP` class is an implementation of the Hilbert Space Gaussian process. This + approximation is a linear model that uses a fixed set of basis vectors, whose coeficients are + random functions of a stationary covariance function's power spectral density. Like + `gp.Latent`, it does not assume a Gaussian noise model and can be used with any likelihood or as + a component anywhere within a model. Also like `gp.Latent`, it has `prior` and `conditional` + methods. It additonally has an `approx_K` method which returns the approximate covariance + matrix. It supports a limited subset of additive covariances. + + For information on choosing appropriate `m`, `L`, and `c`, refer Ruitort-Mayol et. al. or to the + pymc examples documentation. + + Parameters + ---------- + m: list + The number of basis vectors to use for each active dimension (covariance parameter + `active_dim`). + L: list + The boundary of the space for each `active_dim`. It is called the boundary condition. + Choose L such that the domain `[-L, L]` contains all points in the column of X given by the + `active_dim`. + c: 1.5 + The proportion extension factor. Used to construct L from X. Defined as `S = max|X|` such + that `X` is in `[-S, S]`. `L` is the calculated as `c * S`. One of `c` or `L` must be + provided. Further information can be found in Ruitort-Mayol et. al. + drop_first: bool + Default `False`. Sometimes the first basis vector is quite "flat" and very similar to + the intercept term. When there is an intercept in the model, ignoring the first basis + vector may improve sampling. + cov_func: None, 2D array, or instance of Covariance + The covariance function. Defaults to zero. + mean_func: None, instance of Mean + The mean function. Defaults to zero. + + Examples + -------- + .. code:: python + + # A three dimensional column vector of inputs. + X = np.random.randn(100, 3) + + with pm.Model() as model: + # Specify the covariance function. Three input dimensions, but we only want to use the + # last two. + cov_func = pm.gp.cov.ExpQuad(3, ls=0.1, active_dims=[1, 2]) + + # Specify the HSGP. Use 50 basis vectors across each active dimension, [1, 2] for a + # total of 50 * 50 = 2500. The range of the data is inferred from X, and the boundary + # condition multiplier `c` uses 4 * half range. + gp = pm.gp.HSGP(m=[50, 50], c=4.0, cov_func=cov_func) + + # Place a GP prior over the function f. + f = gp.prior("f", X=X) + + ... + + # After fitting or sampling, specify the distribution + # at new points with .conditional + Xnew = np.linspace(-1, 2, 50)[:, None] + + with model: + fcond = gp.conditional("fcond", Xnew=Xnew) + + References + ---------- + - Ruitort-Mayol, G., and Anderson, M., and Solin, A., and Vehtari, A. (2022). Practical + Hilbert Space Approximate Bayesian Gaussian Processes for Probabilistic Programming + + - Solin, A., Sarkka, S. (2019) Hilbert Space Methods for Reduced-Rank Gaussian Process + Regression. + """ + + def __init__( + self, + m: int, + L: Optional = None, + c: float = 1.5, + drop_first=False, + *, + mean_func: Mean = Zero(), + cov_func: Optional[Covariance] = None, + ): + arg_err_msg = ( + "`m` and L, if provided, must be lists or tuples, with one element per active " + "dimension of the kernel or covariance function." + ) + try: + if len(m) != cov_func.D: + raise ValueError(arg_err_msg) + except TypeError as e: + raise ValueError(arg_err_msg) from e + + if L is not None and len(L) != cov_func.D: + raise ValueError(arg_err_msg) + + if L is None and c < 1.2: + warnings.warn( + "Most applications will require a `c >= 1.2` for accuracy at the boundaries of the " + "domain." + ) + + self.drop_first = drop_first + self.m = m + self.L = L + self.c = c + self.D = cov_func.D + + super().__init__(mean_func=mean_func, cov_func=cov_func) + + def __add__(self, other): + raise NotImplementedError("Additive HSGPs aren't supported ") + + def _set_boundary(self, X): + """Make L from X and c if L is not passed in.""" + if self.L is None: + # Define new L based on c and X range + La = at.abs(at.min(X, axis=0)) + Lb = at.abs(at.max(X, axis=0)) + self.L = self.c * at.max(at.stack((La, Lb)), axis=0) + else: + self.L = at.as_tensor_variable(self.L) + + @staticmethod + def _eigendecomposition(X, L, m, D): + """Construct the eigenvalues and eigenfunctions of the Laplace operator.""" + m_star = at.prod(m) + S = np.meshgrid(*[np.arange(1, 1 + m[d]) for d in range(D)]) + S = np.vstack([s.flatten() for s in S]).T + eigvals = at.square((np.pi * S) / (2 * L)) + phi = at.ones((X.shape[0], m_star)) + for d in range(D): + c = 1.0 / np.sqrt(L[d]) + phi *= c * at.sin(at.sqrt(eigvals[:, d]) * (at.tile(X[:, d][:, None], m_star) + L[d])) + omega = at.sqrt(eigvals) + return omega, phi, m_star + + def approx_K(self, X, L, m): + """A helper function which gives the approximate kernel or covariance matrix K. This can be + helpful when trying to see how well an approximation may work. + """ + X, _ = self.cov_func._slice(X) + omega, phi, _ = self._eigendecomposition(X, self.L, self.m, self.cov_func.D) + psd = self.cov_func.psd(omega) + return at.dot(phi * psd, at.transpose(phi)) + + def prior(self, name, X, dims=None): + R""" + Returns the (approximate) GP prior distribution evaluated over the input locations `X`. + + Parameters + ---------- + name: string + Name of the random variable + X: array-like + Function input values. + dims: None + Dimension name for the GP random variable. + """ + + X, _ = self.cov_func._slice(X) + self._set_boundary(X) + omega, phi, m_star = self._eigendecomposition(X, self.L, self.m, self.D) + psd = self.cov_func.psd(omega) + + if self.drop_first: + self.beta = pm.Normal(f"{name}_coeffs_", size=m_star - 1) + self.f = pm.Deterministic( + name, self.mean_func(X) + at.squeeze(at.dot(phi[:, 1:], self.beta * at.sqrt(psd[1:]))), dims=dims + ) + else: + self.beta = pm.Normal(f"{name}_coeffs_", size=m_star) + self.f = pm.Deterministic( + name, self.mean_func(X) + at.squeeze(at.dot(phi, self.beta * at.sqrt(psd))), dims=dims + ) + return self.f + + def _build_conditional(self, name, Xnew): + Xnew, _ = self.cov_func._slice(Xnew) + omega, phi, _ = self._eigendecomposition(Xnew, self.L, self.m, self.D) + psd = self.cov_func.psd(omega) + return self.mean_func(Xnew) + at.squeeze(at.dot(phi, self.beta * psd)) + + def conditional(self, name, Xnew, dims=None): + R""" + Returns the (approximate) conditional distribution evaluated over new input locations + `Xnew`. + + Parameters + ---------- + name: string + Name of the random variable + Xnew: array-like + Function input values. + dims: None + Dimension name for the GP random variable. + """ + fnew = self._build_conditional(name, Xnew) + return pm.Deterministic(name, fnew, dims=dims) diff --git a/pymc/tests/gp/test_cov.py b/pymc/tests/gp/test_cov.py index 550f68727d..5acdfb1d82 100644 --- a/pymc/tests/gp/test_cov.py +++ b/pymc/tests/gp/test_cov.py @@ -181,6 +181,61 @@ def test_inv_rightprod(self): cov = M + pm.gp.cov.ExpQuad(1, 1.0) +class TestCovPSD: + def test_covpsd_add(self): + L = 10.0 + omega = np.pi * np.arange(1, 101) / (2 * L) + with pm.Model() as model: + cov1 = 2 * pm.gp.cov.ExpQuad(1, 0.1) + cov2 = 5 * pm.gp.cov.ExpQuad(1, 1.0) + cov = cov1 + cov2 + psd1 = cov1.psd(omega[:, None]).eval() + psd2 = cov2.psd(omega[:, None]).eval() + psd = cov.psd(omega[:, None]).eval() + npt.assert_allclose(psd, psd1 + psd2) + + def test_copsd_multiply(self): + # This could be implemented via convolution + L = 10.0 + omega = np.pi * np.arange(1, 101) / (2 * L) + with pm.Model() as model: + cov1 = 2 * pm.gp.cov.ExpQuad(1, ls=1) + cov2 = pm.gp.cov.ExpQuad(1, ls=1) + + with pytest.raises(NotImplementedError): + psd = (cov1 * cov2).psd(omega[:, None]).eval() + + def test_covpsd_nonstationary1(self): + L = 10.0 + omega = np.pi * np.arange(1, 101) / (2 * L) + with pm.Model() as model: + cov = 2 * pm.gp.cov.Linear(1, c=5) + + with pytest.raises(ValueError): + psd = cov.psd(omega[:, None]).eval() + + def test_covpsd_nonstationary2(self): + L = 10.0 + omega = np.pi * np.arange(1, 101) / (2 * L) + with pm.Model() as model: + cov = 2 * pm.gp.cov.ExpQuad(1, ls=1) + 10.0 + + with pytest.raises(ValueError): + psd = cov.psd(omega[:, None]).eval() + + def test_covpsd_notimplemented(self): + class NewStationaryCov(pm.gp.cov.Stationary): + pass + + L = 10.0 + omega = np.pi * np.arange(1, 101) / (2 * L) + with pm.Model() as model: + cov = 2 * NewStationaryCov(1, ls=1) + + with pytest.raises(NotImplementedError): + psd = cov.psd(omega[:, None]).eval() + + class TestCovExponentiation: def test_symexp_cov(self): X = np.linspace(0, 1, 10)[:, None] @@ -228,7 +283,7 @@ def test_covexp_shared(self): def test_invalid_covexp(self): X = np.linspace(0, 1, 10)[:, None] - with pytest.raises(ValueError, match=r"can only be exponentiated by a scalar value"): + with pytest.raises(ValueError, match=r"scalar value is required"): with pm.Model() as model: a = np.array([[1.0, 2.0]]) cov = pm.gp.cov.ExpQuad(1, 0.1) ** a @@ -262,7 +317,7 @@ def test_multiops(self): + pm.gp.cov.ExpQuad(1, 0.1) + pm.gp.cov.ExpQuad(1, 0.1) * pm.gp.cov.ExpQuad(1, 0.1) ) - cov2 = pm.gp.cov.ExpQuad(1, 0.1) * pm.gp.cov.ExpQuad(2, 0.1) + cov2 = pm.gp.cov.ExpQuad(2, 0.1) * pm.gp.cov.ExpQuad(2, 0.1) cov = pm.gp.cov.Kron([cov1, cov2]) K_true = kronecker(cov1(X1).eval(), cov2(X2).eval()).eval() K = cov(X).eval() @@ -373,6 +428,15 @@ def test_inv_lengthscale(self): Kd = cov(X, diag=True).eval() npt.assert_allclose(np.diag(K), Kd, atol=1e-5) + def test_psd(self): + # compare to simple 1d formula + X = np.linspace(0, 1, 10)[:, None] + omega = np.linspace(0, 2, 50) + ell = 2.0 + true_1d_psd = np.sqrt(2 * np.pi * np.square(ell)) * np.exp(-0.5 * np.square(ell * omega)) + test_1d_psd = pm.gp.cov.ExpQuad(1, ls=ell).psd(omega[:, None]).flatten().eval() + npt.assert_allclose(true_1d_psd, test_1d_psd, atol=1e-5) + class TestWhiteNoise: def test_1d(self): @@ -449,6 +513,16 @@ def test_1d(self): Kd = cov(X, diag=True).eval() npt.assert_allclose(np.diag(K), Kd, atol=1e-5) + def test_psd(self): + # compare to simple 1d formula + X = np.linspace(0, 1, 10)[:, None] + omega = np.linspace(0, 2, 50) + ell = 2.0 + lamda = np.sqrt(5) / ell + true_1d_psd = (16.0 / 3.0) * np.power(lamda, 5) * np.power(lamda**2 + omega**2, -3) + test_1d_psd = pm.gp.cov.Matern52(1, ls=ell).psd(omega[:, None]).flatten().eval() + npt.assert_allclose(true_1d_psd, test_1d_psd, atol=1e-5) + class TestMatern32: def test_1d(self): @@ -463,6 +537,16 @@ def test_1d(self): Kd = cov(X, diag=True).eval() npt.assert_allclose(np.diag(K), Kd, atol=1e-5) + def test_psd(self): + # compare to simple 1d formula + X = np.linspace(0, 1, 10)[:, None] + omega = np.linspace(0, 2, 50) + ell = 2.0 + lamda = np.sqrt(3) / ell + true_1d_psd = 4 * np.power(lamda, 3) * np.power(lamda**2 + omega**2, -2) + test_1d_psd = pm.gp.cov.Matern32(1, ls=ell).psd(omega[:, None]).flatten().eval() + npt.assert_allclose(true_1d_psd, test_1d_psd, atol=1e-5) + class TestMatern12: def test_1d(self):