diff --git a/pymc_experimental/bayesian_estimator_linearmodel.py b/pymc_experimental/bayesian_estimator_linearmodel.py new file mode 100644 index 000000000..4feaf2440 --- /dev/null +++ b/pymc_experimental/bayesian_estimator_linearmodel.py @@ -0,0 +1,471 @@ +import json +from abc import abstractmethod +from pathlib import Path +from typing import Any, Dict, Union + +import arviz as az +import numpy as np +import pandas as pd +import pymc as pm +import xarray as xr +from pymc.util import RandomState + +from pymc_experimental.model_builder import ModelBuilder + +# If scikit-learn is available, use its data validator +try: + from sklearn.utils.validation import check_array, check_X_y +# If scikit-learn is not available, return the data unchanged +except ImportError: + + def check_X_y(X, y, **kwargs): + return X, y + + def check_array(X, **kwargs): + return X + + +class BayesianEstimator(ModelBuilder): + """ + Base class similar to ModelBuilder but customized for integration with a scikit-learn workflow. + It is designed to encapsulate parameter inference ("fit") and posterior prediction ("predict") + for simple models with the following characteristics: + - One or more input features for each observation + - One observed output feature for each observation + + Estimators derived from this base class can utilize scikit-learn transformers for input and + output accessed via `fit` and `predict`. (`TransformedTargetRegressor` would need to be extended + in order to transform the output of `predict_proba` or `predict_posterior`.) + + Example scikit-learn usage: + >>> from sklearn.pipeline import Pipeline + >>> from sklearn.compose import TransformedTargetRegressor + >>> model = Pipeline([ + >>> ('input_scaling', StandardScaler()), + >>> ('linear_model', TransformedTargetRegressor(LinearModel(model_config), + >>> transformer=StandardScaler()),) + >>> ]) + >>> model.fit(X_obs, y_obs) + >>> y_pred = model.predict(X_pred) + + The format for probabilistic output forecasts is currently an xarray `DataSet` of the posterior + prediction draws for each input prediction. This + + The `sklearn` package is not a dependency for using this class, although if imports from `sklearn` + are successful, then scikit-learn's data validation functions are used to accept "array-like" input. + """ + + model_type = "BaseClass" + version = "None" + + def __init__( + self, + model_config: Dict = None, + sampler_config: Dict = None, + ): + """ + Initializes model configuration and sampler configuration for the model + + Parameters + ---------- + model_config : dict + Parameters that initialise model configuration + sampler_config : dict + Parameters that initialise sampler configuration + """ + if model_config is None: + model_config = self.default_model_config + self.model_config = model_config + + if sampler_config is None: + sampler_config = self.default_sampler_config + self.sampler_config = sampler_config + + self.model = None # Set by build_model + self.output_var = None # Set by build_model + self.idata = None # idata is generated during fitting + self.is_fitted_ = False + + @property + @abstractmethod + def default_model_config(self): + raise NotImplementedError + + @property + @abstractmethod + def default_sampler_config(self): + raise NotImplementedError + + def _validate_data(self, X, y=None): + if y is not None: + return check_X_y(X, y, accept_sparse=False, y_numeric=True, multi_output=False) + else: + return check_array(X, accept_sparse=False) + + @abstractmethod + def build_model(self) -> None: + """ + Build the PYMC model. The model is built with placeholder data. + Actual data will be set by _data_setter when fitting or evaluating the model. + Data array size can change but number of dimensions must stay the same. + + Returns: + ---------- + None + """ + raise NotImplementedError + + @abstractmethod + def _data_setter(self, X, y=None): + """ + Sets new data in the model. + + Parameters + ---------- + X : array, shape (n_obs, n_features) + The training input samples. + y : array, shape (n_obs,) + The target values (real numbers). + + Returns: + ---------- + None + + """ + + raise NotImplementedError + + def fit( + self, + X: Union[np.ndarray, pd.DataFrame, pd.Series], + y: Union[np.ndarray, pd.DataFrame, pd.Series], + progressbar: bool = True, + random_seed: RandomState = None, + **kwargs: Any, + ) -> "BayesianEstimator": + """ + Fit a model using the data passed as a parameter. + Sets attrs to inference data of the model. + + + Parameters + ---------- + X : array-like if sklearn is available, otherwise array, shape (n_obs, n_features) + The training input samples. + y : array-like if sklearn is available, otherwise array, shape (n_obs,) + The target values (real numbers). + progressbar : bool + Specifies whether the fit progressbar should be displayed + random_seed : RandomState + Provides sampler with initial random seed for obtaining reproducible samples + **kwargs : Any + Custom sampler settings can be provided in form of keyword arguments. + + Returns + ------- + self : object + Returns self. + """ + + X, y = self._validate_data(X, y) + + self.build_model() + self._data_setter(X, y) + + sampler_config = self.sampler_config.copy() + sampler_config["progressbar"] = progressbar + sampler_config["random_seed"] = random_seed + sampler_config.update(**kwargs) + + self.idata = self.sample_model(**sampler_config) + return self + + def predict( + self, + X_pred: Union[np.ndarray, pd.DataFrame, pd.Series], + extend_idata: bool = True, + ) -> np.ndarray: + """ + Uses model to predict on unseen data and return point prediction of all the samples. The point prediction + for each input row is the expected output value, computed as the mean of MCMC samples. + + Parameters + --------- + X_pred : array-like if sklearn is available, otherwise array, shape (n_pred, n_features) + The input data used for prediction. + extend_idata : Boolean determining whether the predictions should be added to inference data object. + Defaults to True. + + Returns + ------- + y_pred : ndarray, shape (n_pred,) + Predicted output corresponding to input X_pred. + """ + if not hasattr(self, "output_var"): + raise NotImplementedError(f"Subclasses of {__class__} should set self.output_var") + + X_pred = self._validate_data(X_pred) + + posterior_predictive_samples = self.sample_posterior_predictive( + X_pred, extend_idata, combined=False + ) + + if self.output_var not in posterior_predictive_samples: + raise KeyError( + f"Output variable {self.output_var} not found in posterior predictive samples." + ) + + posterior_means = posterior_predictive_samples[self.output_var].mean( + dim=["chain", "draw"], keep_attrs=False + ) + return posterior_means.data + + def predict_posterior( + self, + X_pred: Union[np.ndarray, pd.DataFrame, pd.Series], + extend_idata: bool = True, + combined: bool = True, + ) -> xr.DataArray: + """ + Generate posterior predictive samples on unseen data. + + Parameters + --------- + X_pred : array-like if sklearn is available, otherwise array, shape (n_pred, n_features) + The input data used for prediction. + extend_idata : Boolean determining whether the predictions should be added to inference data object. + Defaults to True. + combined: Combine chain and draw dims into sample. Won't work if a dim named sample already exists. + Defaults to True. + + Returns + ------- + y_pred : DataArray, shape (n_pred, chains * draws) if combined is True, otherwise (chains, draws, n_pred) + Posterior predictive samples for each input X_pred + """ + if not hasattr(self, "output_var"): + raise NotImplementedError(f"Subclasses of {__class__} should set self.output_var") + + X_pred = self._validate_data(X_pred) + posterior_predictive_samples = self.sample_posterior_predictive( + X_pred, extend_idata, combined + ) + + if self.output_var not in posterior_predictive_samples: + raise KeyError( + f"Output variable {self.output_var} not found in posterior predictive samples." + ) + + return posterior_predictive_samples[self.output_var] + + def predict_proba( + self, + X_pred: Union[np.ndarray, pd.DataFrame, pd.Series], + extend_idata: bool = True, + combined: bool = False, + ) -> xr.DataArray: + """Alias for `predict_posterior`, for consistency with scikit-learn probabilistic estimators.""" + return self.predict_posterior(X_pred, extend_idata, combined) + + def sample_prior_predictive( + self, X_pred, samples: int = None, extend_idata: bool = False, combined: bool = True + ): + """ + Sample from the model's prior predictive distribution. + + Parameters + --------- + X_pred : array, shape (n_pred, n_features) + The input data used for prediction using prior distribution. + samples : int + Number of samples from the prior parameter distributions to generate. + If not set, uses sampler_config['draws'] if that is available, otherwise defaults to 500. + extend_idata : Boolean determining whether the predictions should be added to inference data object. + Defaults to False. + combined: Combine chain and draw dims into sample. Won't work if a dim named sample already exists. + Defaults to True. + + Returns + ------- + prior_predictive_samples : DataArray, shape (n_pred, samples) + Prior predictive samples for each input X_pred + """ + if samples is None: + samples = self.sampler_config.get("draws", 500) + + if self.model is None: + self.build_model() + + self._data_setter(X_pred) + + with self.model: # sample with new input data + prior_pred = pm.sample_prior_predictive(samples) + self.set_idata_attrs(prior_pred) + if extend_idata: + if self.idata is not None: + self.idata.extend(prior_pred) + else: + self.idata = prior_pred + + prior_predictive_samples = az.extract(prior_pred, "prior_predictive", combined=combined) + + return prior_predictive_samples + + def sample_posterior_predictive(self, X_pred, extend_idata, combined): + """ + Sample from the model's posterior predictive distribution. + + Parameters + --------- + X_pred : array, shape (n_pred, n_features) + The input data used for prediction using prior distribution.. + extend_idata : Boolean determining whether the predictions should be added to inference data object. + Defaults to False. + combined: Combine chain and draw dims into sample. Won't work if a dim named sample already exists. + Defaults to True. + + Returns + ------- + posterior_predictive_samples : DataArray, shape (n_pred, samples) + Posterior predictive samples for each input X_pred + """ + self._data_setter(X_pred) + + with self.model: # sample with new input data + post_pred = pm.sample_posterior_predictive(self.idata) + if extend_idata: + self.idata.extend(post_pred) + + posterior_predictive_samples = az.extract( + post_pred, "posterior_predictive", combined=combined + ) + + return posterior_predictive_samples + + @classmethod + def load(cls, fname: str): + """ + Creates a BayesianEstimator instance from a file, + Loads inference data for the model. + + Parameters + ---------- + fname : string + This denotes the name with path from where idata should be loaded from. + + Returns + ------- + Returns an instance of BayesianEstimator. + + Raises + ------ + ValueError + If the inference data that is loaded doesn't match with the model. + """ + + filepath = Path(str(fname)) + idata = az.from_netcdf(filepath) + model = cls( + model_config=json.loads(idata.attrs["model_config"]), + sampler_config=json.loads(idata.attrs["sampler_config"]), + ) + model.idata = idata + model.build_model() + # All previously used data is in idata. + + if model.id != idata.attrs["id"]: + raise ValueError( + f"The file '{fname}' does not contain an inference data of the same model or configuration as '{cls._model_type}'" + ) + + return model + + @property + def _serializable_model_config(self): + return self.model_config + + def get_params(self, deep=True): + """ + Get all the model parameters needed to instantiate a copy of the model, not including training data. + """ + return {"model_config": self.model_config, "sampler_config": self.sampler_config} + + def set_params(self, **params): + """ + Set all the model parameters needed to instantiate the model, not including training data. + """ + self.model_config = params["model_config"] + self.sampler_config = params["sampler_config"] + + +class LinearModel(BayesianEstimator): + """ + This class is an implementation of a single-input linear regression model in PYMC using the + BayesianEstimator base class for interoperability with scikit-learn. + """ + + _model_type = "LinearModel" + version = "0.1" + + @property + def default_model_config(self): + return { + "intercept": {"loc": 0, "scale": 10}, + "slope": {"loc": 0, "scale": 10}, + "obs_error": 2, + } + + @property + def default_sampler_config(self): + return { + "draws": 1_000, + "tune": 1_000, + "chains": 3, + "target_accept": 0.95, + } + + def build_model(self): + cfg = self.model_config + + # The model is built with placeholder data. + # Actual data will be set by _data_setter when fitting or evaluating the model. + # Data array size can change but number of dimensions must stay the same. + with pm.Model() as self.model: + x = pm.MutableData("x", np.zeros((1,)), dims="observation") + y_data = pm.MutableData("y_data", np.zeros((1,)), dims="observation") + + # priors + intercept = pm.Normal( + "intercept", cfg["intercept"]["loc"], sigma=cfg["intercept"]["scale"] + ) + slope = pm.Normal("slope", cfg["slope"]["loc"], sigma=cfg["slope"]["scale"]) + obs_error = pm.HalfNormal("σ_model_fmc", cfg["obs_error"]) + + # Model + y_model = pm.Deterministic("y_model", intercept + slope * x, dims="observation") + + # observed data + y_hat = pm.Normal( + "y_hat", + y_model, + sigma=obs_error, + shape=x.shape, + observed=y_data, + dims="observation", + ) + self.output_var = "y_hat" + + def _data_setter(self, X, y=None): + with self.model: + pm.set_data({"x": X[:, 0]}) + if y is not None: + pm.set_data({"y_data": y.squeeze()}) + + @classmethod + def create_sample_input(cls, nsamples=100): + x = np.linspace(start=0, stop=1, num=nsamples) + y = 5 * x + 3 + y = y + np.random.normal(0, 1, len(x)) + + x = np.expand_dims(x, -1) # scikit assumes a dimension for features. + return x, y diff --git a/pymc_experimental/model_builder.py b/pymc_experimental/model_builder.py index 096109ee8..48f168d8d 100644 --- a/pymc_experimental/model_builder.py +++ b/pymc_experimental/model_builder.py @@ -176,6 +176,33 @@ def build_model( """ raise NotImplementedError + def sample_model(self, **kwargs): + + if self.model is None: + raise RuntimeError( + "The model hasn't been built yet, call .build_model() first or call .fit() instead." + ) + + with self.model: + sampler_args = {**self.sampler_config, **kwargs} + idata = pm.sample(**sampler_args) + idata.extend(pm.sample_prior_predictive()) + idata.extend(pm.sample_posterior_predictive(idata)) + + self.set_idata_attrs(idata) + return idata + + def set_idata_attrs(self, idata=None): + if idata is None: + idata = self.idata + if idata is None: + raise RuntimeError("No idata provided to set attrs on.") + idata.attrs["id"] = self.id + idata.attrs["model_type"] = self._model_type + idata.attrs["version"] = self.version + idata.attrs["sampler_config"] = json.dumps(self.sampler_config) + idata.attrs["model_config"] = json.dumps(self._serializable_model_config) + def save(self, fname: str) -> None: """ Saves inference data of the model. @@ -195,7 +222,7 @@ def save(self, fname: str) -> None: >>> name = './mymodel.nc' >>> model.save(name) """ - if self.idata is not None and "fit_data" in self.idata: + if self.idata is not None and "posterior" in self.idata: file = Path(str(fname)) self.idata.to_netcdf(file) else: @@ -232,9 +259,9 @@ def load(cls, fname: str): filepath = Path(str(fname)) idata = az.from_netcdf(filepath) model_builder = cls( + data=idata.fit_data.to_dataframe(), model_config=json.loads(idata.attrs["model_config"]), sampler_config=json.loads(idata.attrs["sampler_config"]), - data=idata.fit_data.to_dataframe(), ) model_builder.idata = idata model_builder.build_model(model_builder.data, model_builder.model_config) @@ -294,16 +321,7 @@ def fit( sampler_config["progressbar"] = progressbar sampler_config["random_seed"] = random_seed - with self.model: - self.idata = pm.sample(**self.sampler_config, **kwargs) - self.idata.extend(pm.sample_prior_predictive()) - self.idata.extend(pm.sample_posterior_predictive(self.idata)) - - self.idata.attrs["id"] = self.id - self.idata.attrs["model_type"] = self._model_type - self.idata.attrs["version"] = self.version - self.idata.attrs["sampler_config"] = json.dumps(self.sampler_config) - self.idata.attrs["model_config"] = json.dumps(self._serializable_model_config) + self.idata = self.sample_model(**sampler_config) self.idata.add_groups(fit_data=self.data.to_xarray()) return self.idata diff --git a/pymc_experimental/tests/test_bayesian_estimator_linearmodel.py b/pymc_experimental/tests/test_bayesian_estimator_linearmodel.py new file mode 100644 index 000000000..9fc9b5754 --- /dev/null +++ b/pymc_experimental/tests/test_bayesian_estimator_linearmodel.py @@ -0,0 +1,171 @@ +# Copyright 2023 The PyMC Developers +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import hashlib +import sys +import tempfile + +import numpy as np +import pytest +import xarray as xr + +from pymc_experimental.bayesian_estimator_linearmodel import LinearModel + +try: + from sklearn.compose import TransformedTargetRegressor + from sklearn.pipeline import Pipeline + from sklearn.preprocessing import StandardScaler + + sklearn_available = True +except ImportError: + sklearn_available = False + + +@pytest.fixture(scope="module") +def sample_input(): + x, y = LinearModel.create_sample_input() + return x, y + + +@pytest.fixture(scope="module") +def fitted_linear_model_instance(sample_input): + sampler_config = { + "draws": 500, + "tune": 300, + "chains": 2, + "target_accept": 0.95, + } + x, y = sample_input + model = LinearModel() + model.fit(x, y) + return model + + +def test_save_without_fit_raises_runtime_error(): + x, y = LinearModel.create_sample_input() + test_model = LinearModel() + with pytest.raises(RuntimeError): + test_model.save("saved_model") + + +def test_fit(fitted_linear_model_instance): + model = fitted_linear_model_instance + + x_pred = np.random.uniform(low=0, high=1, size=(100, 1)) + pred = model.predict(x_pred) + assert len(x_pred) == len(pred) + assert isinstance(pred, np.ndarray) + post_pred = model.predict_posterior(x_pred) + assert len(x_pred) == len(post_pred) + assert isinstance(post_pred, xr.DataArray) + + +@pytest.mark.skipif( + sys.platform == "win32", reason="Permissions for temp files not granted on windows CI." +) +def test_save_load(fitted_linear_model_instance): + model = fitted_linear_model_instance + temp = tempfile.NamedTemporaryFile(mode="w", encoding="utf-8", delete=False) + model.save(temp.name) + model2 = LinearModel.load(temp.name) + assert model.idata.groups() == model2.idata.groups() + + x_pred = np.random.uniform(low=0, high=1, size=(100, 1)) + pred1 = model.predict(x_pred) + pred2 = model2.predict(x_pred) + # Predictions should have similar statistical characteristics + assert pred1.mean() == pytest.approx(pred2.mean(), 1e-3) + assert pred1.var() == pytest.approx(pred2.var(), 1e-2) + temp.close() + + +def test_predict(fitted_linear_model_instance): + model = fitted_linear_model_instance + x_pred = np.random.uniform(low=0, high=1, size=(100, 1)) + pred = model.predict(x_pred) + assert len(x_pred) == len(pred) + assert np.issubdtype(pred.dtype, np.floating) + + +@pytest.mark.parametrize("combined", [True, False]) +def test_predict_posterior(fitted_linear_model_instance, combined): + model = fitted_linear_model_instance + n_pred = 150 + x_pred = np.random.uniform(low=0, high=1, size=(n_pred, 1)) + pred = model.predict_posterior(x_pred, combined=combined) + chains = model.idata.sample_stats.dims["chain"] + draws = model.idata.sample_stats.dims["draw"] + expected_shape = (n_pred, chains * draws) if combined else (chains, draws, n_pred) + assert pred.shape == expected_shape + assert np.issubdtype(pred.dtype, np.floating) + # TODO: check that extend_idata has the expected effect + + +@pytest.mark.parametrize("samples", [None, 300]) +@pytest.mark.parametrize("combined", [True, False]) +def test_sample_prior_predictive(samples, combined, sample_input): + x, y = sample_input + model = LinearModel() + prior_pred = model.sample_prior_predictive(x, samples, combined=combined)[model.output_var] + draws = model.sampler_config["draws"] if samples is None else samples + chains = 1 + expected_shape = (len(x), chains * draws) if combined else (chains, draws, len(x)) + assert prior_pred.shape == expected_shape + # TODO: check that extend_idata has the expected effect + + +def test_id(sample_input): + x, y = sample_input + model_config = { + "intercept": {"loc": 0, "scale": 10}, + "slope": {"loc": 0, "scale": 10}, + "obs_error": 2, + } + sampler_config = { + "draws": 1_000, + "tune": 1_000, + "chains": 3, + "target_accept": 0.95, + } + model = LinearModel(model_config=model_config, sampler_config=sampler_config) + + expected_id = hashlib.sha256( + str(model_config.values()).encode() + model.version.encode() + model._model_type.encode() + ).hexdigest()[:16] + + assert model.id == expected_id + + +@pytest.mark.skipif(not sklearn_available, reason="scikit-learn package is not available.") +def test_pipeline_integration(sample_input): + model_config = { + "intercept": {"loc": 0, "scale": 2}, + "slope": {"loc": 0, "scale": 2}, + "obs_error": 1, + "default_output_var": "y_hat", + } + model = Pipeline( + [ + ("input_scaling", StandardScaler()), + ( + "linear_model", + TransformedTargetRegressor(LinearModel(model_config), transformer=StandardScaler()), + ), + ] + ) + x, y = sample_input + model.fit(x, y) + + x_pred = np.random.uniform(low=0, high=1, size=(100, 1)) + model.predict(x_pred)