From be5722c049e81a7dce70bde76c2003e9b410fcc5 Mon Sep 17 00:00:00 2001 From: Michael Osthege Date: Sun, 11 Dec 2022 12:48:24 +0100 Subject: [PATCH 1/2] Fix flaky Moyal logcdf test --- pymc/tests/distributions/test_continuous.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pymc/tests/distributions/test_continuous.py b/pymc/tests/distributions/test_continuous.py index b69aea9df1..ba01ca0b62 100644 --- a/pymc/tests/distributions/test_continuous.py +++ b/pymc/tests/distributions/test_continuous.py @@ -778,10 +778,13 @@ def test_moyal_logp(self): reason="PyMC underflows earlier than scipy on float32", ) def test_moyal_logcdf(self): + # SciPy has new (?) precision issues at {mu=-2.1, sigma=0.5, x=2.1} + # We circumvent it by skipping sigma=0.5: + rplusbig = Domain([0, 0.9, 0.99, 1, 1.5, 2, 20, np.inf]) check_logcdf( pm.Moyal, R, - {"mu": R, "sigma": Rplusbig}, + {"mu": R, "sigma": rplusbig}, lambda value, mu, sigma: floatX(st.moyal.logcdf(value, mu, sigma)), ) if pytensor.config.floatX == "float32": From 82dfbf60b4da07b363a17f75d4145d5b3cb0d51f Mon Sep 17 00:00:00 2001 From: Maxim Kochurov Date: Fri, 9 Dec 2022 16:23:22 +0300 Subject: [PATCH 2/2] Refactor `Minibatch` and stop using `MRG_RandomStream` in VI Closes #4523 Closes #6277 --- pymc/data.py | 392 +++--------------- pymc/distributions/logprob.py | 5 +- pymc/model.py | 5 +- pymc/pytensorf.py | 2 +- pymc/tests/helpers.py | 2 +- pymc/tests/test_data.py | 83 ++-- pymc/tests/variational/test_approximations.py | 12 + pymc/tests/variational/test_inference.py | 32 +- pymc/variational/approximations.py | 18 +- pymc/variational/inference.py | 4 +- pymc/variational/operators.py | 11 +- pymc/variational/opvi.py | 51 ++- pymc/variational/stein.py | 3 +- 13 files changed, 193 insertions(+), 427 deletions(-) diff --git a/pymc/data.py b/pymc/data.py index ae4f42bb37..35a99607f4 100644 --- a/pymc/data.py +++ b/pymc/data.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections import io import os import pkgutil @@ -20,14 +19,19 @@ import warnings from copy import copy -from typing import Any, Dict, List, Optional, Sequence, Tuple, Union, cast +from typing import Dict, Optional, Sequence, Tuple, Union, cast import numpy as np import pytensor import pytensor.tensor as at from pytensor.compile.sharedvalue import SharedVariable -from pytensor.graph.basic import Apply +from pytensor.raise_op import Assert +from pytensor.scalar import Cast +from pytensor.tensor.elemwise import Elemwise +from pytensor.tensor.random import RandomStream +from pytensor.tensor.random.basic import IntegersRV +from pytensor.tensor.subtensor import AdvancedSubtensor from pytensor.tensor.type import TensorType from pytensor.tensor.var import TensorConstant, TensorVariable @@ -39,7 +43,6 @@ "get_data", "GeneratorAdapter", "Minibatch", - "align_minibatches", "Data", "ConstantData", "MutableData", @@ -126,351 +129,74 @@ def __hash__(self): return hash(id(self)) -class Minibatch(TensorVariable): - """Multidimensional minibatch that is pure TensorVariable +class MinibatchIndexRV(IntegersRV): + _print_name = ("minibatch_index", r"\operatorname{minibatch\_index}") - Parameters - ---------- - data: np.ndarray - initial data - batch_size: ``int`` or ``List[int|tuple(size, random_seed)]`` - batch size for inference, random seed is needed - for child random generators - dtype: ``str`` - cast data to specific type - broadcastable: tuple[bool] - change broadcastable pattern that defaults to ``(False, ) * ndim`` - name: ``str`` - name for tensor, defaults to "Minibatch" - random_seed: ``int`` - random seed that is used by default - update_shared_f: ``callable`` - returns :class:`ndarray` that will be carefully - stored to underlying shared variable - you can use it to change source of - minibatches programmatically - in_memory_size: ``int`` or ``List[int|slice|Ellipsis]`` - data size for storing in ``pytensor.shared`` - - Attributes - ---------- - shared: shared tensor - Used for storing data - minibatch: minibatch tensor - Used for training - - Notes - ----- - Below is a common use case of Minibatch with variational inference. - Importantly, we need to make PyMC "aware" that a minibatch is being used in inference. - Otherwise, we will get the wrong :math:`logp` for the model. - the density of the model ``logp`` that is affected by Minibatch. See more in the examples below. - To do so, we need to pass the ``total_size`` parameter to the observed node, which correctly scales - the density of the model ``logp`` that is affected by Minibatch. See more in the examples below. - - Examples - -------- - Consider we have `data` as follows: - - >>> data = np.random.rand(100, 100) - - if we want a 1d slice of size 10 we do - - >>> x = Minibatch(data, batch_size=10) - - Note that your data is cast to ``floatX`` if it is not integer type - But you still can add the ``dtype`` kwarg for :class:`Minibatch` - if you need more control. - - If we want 10 sampled rows and columns - ``[(size, seed), (size, seed)]`` we can use - - >>> x = Minibatch(data, batch_size=[(10, 42), (10, 42)], dtype='int32') - >>> assert str(x.dtype) == 'int32' - - - Or, more simply, we can use the default random seed = 42 - ``[size, size]`` - - >>> x = Minibatch(data, batch_size=[10, 10]) - - - In the above, `x` is a regular :class:`TensorVariable` that supports any math operations: - - - >>> assert x.eval().shape == (10, 10) - - - You can pass the Minibatch `x` to your desired model: - - >>> with pm.Model() as model: - ... mu = pm.Flat('mu') - ... sigma = pm.HalfNormal('sigma') - ... lik = pm.Normal('lik', mu, sigma, observed=x, total_size=(100, 100)) - - - Then you can perform regular Variational Inference out of the box - - - >>> with model: - ... approx = pm.fit() - - - Important note: :class:``Minibatch`` has ``shared``, and ``minibatch`` attributes - you can call later: - - >>> x.set_value(np.random.laplace(size=(100, 100))) - - and minibatches will be then from new storage - it directly affects ``x.shared``. - A less convenient convenient, but more explicit, way to achieve the same - thing: - - >>> x.shared.set_value(pm.floatX(np.random.laplace(size=(100, 100)))) - - The programmatic way to change storage is as follows - I import ``partial`` for simplicity - >>> from functools import partial - >>> datagen = partial(np.random.laplace, size=(100, 100)) - >>> x = Minibatch(datagen(), batch_size=10, update_shared_f=datagen) - >>> x.update_shared() - - To be more concrete about how we create a minibatch, here is a demo: - 1. create a shared variable - >>> shared = pytensor.shared(data) +minibatch_index = MinibatchIndexRV() - 2. take a random slice of size 10: - >>> ridx = pm.at_rng().uniform(size=(10,), low=0, high=data.shape[0]-1e-10).astype('int64') - - 3) take the resulting slice: - - >>> minibatch = shared[ridx] - - That's done. Now you can use this minibatch somewhere else. - You can see that the implementation does not require a fixed shape - for the shared variable. Feel free to use that if needed. - *FIXME: What is "that" which we can use here? A fixed shape? Should this say - "but feel free to put a fixed shape on the shared variable, if appropriate?"* - - Suppose you need to make some replacements in the graph, e.g. change the minibatch to testdata - - >>> node = x ** 2 # arbitrary expressions on minibatch `x` - >>> testdata = pm.floatX(np.random.laplace(size=(1000, 10))) - - Then you should create a `dict` with replacements: - - >>> replacements = {x: testdata} - >>> rnode = pytensor.clone_replace(node, replacements) - >>> assert (testdata ** 2 == rnode.eval()).all() - - *FIXME: In the following, what is the **reason** to replace the Minibatch variable with - its shared variable? And in the following, the `rnode` is a **new** node, not a modification - of a previously existing node, correct?* - To replace a minibatch with its shared variable you should do - the same things. The Minibatch variable is accessible through the `minibatch` attribute. - For example - - >>> replacements = {x.minibatch: x.shared} - >>> rnode = pytensor.clone_replace(node, replacements) - - For more complex slices some more code is needed that can seem not so clear - - >>> moredata = np.random.rand(10, 20, 30, 40, 50) +def is_minibatch(v: TensorVariable) -> bool: + return ( + isinstance(v.owner.op, AdvancedSubtensor) + and isinstance(v.owner.inputs[1].owner.op, MinibatchIndexRV) + and valid_for_minibatch(v.owner.inputs[0]) + ) - The default ``total_size`` that can be passed to PyMC random node - is then ``(10, 20, 30, 40, 50)`` but can be less verbose in some cases - 1. Advanced indexing, ``total_size = (10, Ellipsis, 50)`` +def valid_for_minibatch(v: TensorVariable) -> bool: + return ( + v.owner is None + # The only PyTensor operation we allow on observed data is type casting + # Although we could allow for any graph that does not depend on other RVs + or ( + isinstance(v.owner.op, Elemwise) + and v.owner.inputs[0].owner is None + and isinstance(v.owner.op.scalar_op, Cast) + ) + ) - >>> x = Minibatch(moredata, [2, Ellipsis, 10]) - We take the slice only for the first and last dimension +def assert_all_scalars_equal(scalar, *scalars): + if len(scalars) == 0: + return scalar + else: + return Assert( + "All variables shape[0] in Minibatch should be equal, check your Minibatch(data1, data2, ...) code" + )(scalar, at.all([scalar == s for s in scalars])) - >>> assert x.eval().shape == (2, 20, 30, 40, 10) - 2. Skipping a particular dimension, ``total_size = (10, None, 30)``: +def Minibatch(variable: TensorVariable, *variables: TensorVariable, batch_size: int): + """ + Get random slices from variables from the leading dimension. - >>> x = Minibatch(moredata, [2, None, 20]) - >>> assert x.eval().shape == (2, 20, 20, 40, 50) - 3. Mixing both of these together, ``total_size = (10, None, 30, Ellipsis, 50)``: + Parameters + ---------- + variable: TensorVariable + variables: TensorVariable + batch_size: int - >>> x = Minibatch(moredata, [2, None, 20, Ellipsis, 10]) - >>> assert x.eval().shape == (2, 20, 20, 40, 10) + Examples + -------- + >>> data1 = np.random.randn(100, 10) + >>> data2 = np.random.randn(100, 20) + >>> mdata1, mdata2 = Minibatch(data1, data2, batch_size=10) """ - RNG: Dict[str, List[Any]] = collections.defaultdict(list) - - @pytensor.config.change_flags(compute_test_value="raise") - def __init__( - self, - data, - batch_size=128, - dtype=None, - broadcastable=None, - shape=None, - name="Minibatch", - random_seed=42, - update_shared_f=None, - in_memory_size=None, - ): - if dtype is None: - data = pm.smartfloatX(np.asarray(data)) - else: - data = np.asarray(data, dtype) - in_memory_slc = self.make_static_slices(in_memory_size) - self.shared = pytensor.shared(data[tuple(in_memory_slc)]) - self.update_shared_f = update_shared_f - self.random_slc = self.make_random_slices(self.shared.shape, batch_size, random_seed) - minibatch = self.shared[self.random_slc] - if broadcastable is not None: - warnings.warn( - "Minibatch `broadcastable` argument is deprecated. Use `shape` instead", - FutureWarning, + rng = RandomStream() + tensor, *tensors = tuple(map(at.as_tensor, (variable, *variables))) + upper = assert_all_scalars_equal(*[t.shape[0] for t in (tensor, *tensors)]) + slc = rng.gen(minibatch_index, 0, upper, size=batch_size) + for i, v in enumerate((tensor, *tensors)): + if not valid_for_minibatch(v): + raise ValueError( + f"{i}: {v} is not valid for Minibatch, only constants or constants.astype(dtype) are allowed" ) - assert shape is None - shape = [1 if b else None for b in broadcastable] - if shape is not None: - minibatch = at.specify_shape(minibatch, shape) - self.minibatch = minibatch - super().__init__(self.minibatch.type, None, None, name=name) - Apply(pytensor.compile.view_op, inputs=[self.minibatch], outputs=[self]) - self.tag.test_value = copy(self.minibatch.tag.test_value) - - def rslice(self, total, size, seed): - if size is None: - return slice(None) - elif isinstance(size, int): - rng = pm.at_rng(seed) - Minibatch.RNG[id(self)].append(rng) - return rng.uniform(size=(size,), low=0.0, high=pm.floatX(total) - 1e-16).astype("int64") - else: - raise TypeError("Unrecognized size type, %r" % size) - - def __del__(self): - del Minibatch.RNG[id(self)] - - @staticmethod - def make_static_slices(user_size): - if user_size is None: - return [Ellipsis] - elif isinstance(user_size, int): - return slice(None, user_size) - elif isinstance(user_size, (list, tuple)): - slc = list() - for i in user_size: - if isinstance(i, int): - slc.append(i) - elif i is None: - slc.append(slice(None)) - elif i is Ellipsis: - slc.append(Ellipsis) - elif isinstance(i, slice): - slc.append(i) - else: - raise TypeError("Unrecognized size type, %r" % user_size) - return slc - else: - raise TypeError("Unrecognized size type, %r" % user_size) - - def make_random_slices(self, in_memory_shape, batch_size, default_random_seed): - if batch_size is None: - return [Ellipsis] - elif isinstance(batch_size, int): - slc = [self.rslice(in_memory_shape[0], batch_size, default_random_seed)] - elif isinstance(batch_size, (list, tuple)): - - def check(t): - if t is Ellipsis or t is None: - return True - else: - if isinstance(t, (tuple, list)): - if not len(t) == 2: - return False - else: - return isinstance(t[0], int) and isinstance(t[1], int) - elif isinstance(t, int): - return True - else: - return False - - # end check definition - if not all(check(t) for t in batch_size): - raise TypeError( - "Unrecognized `batch_size` type, expected " - "int or List[int|tuple(size, random_seed)] where " - "size and random seed are both ints, got %r" % batch_size - ) - batch_size = [(i, default_random_seed) if isinstance(i, int) else i for i in batch_size] - shape = in_memory_shape - if Ellipsis in batch_size: - sep = batch_size.index(Ellipsis) - begin = batch_size[:sep] - end = batch_size[sep + 1 :] - if Ellipsis in end: - raise ValueError( - "Double Ellipsis in `batch_size` is restricted, got %r" % batch_size - ) - if len(end) > 0: - shp_mid = shape[sep : -len(end)] - mid = [at.arange(s) for s in shp_mid] - else: - mid = [] - else: - begin = batch_size - end = [] - mid = [] - if (len(begin) + len(end)) > len(in_memory_shape.eval()): - raise ValueError( - "Length of `batch_size` is too big, " - "number of ints is bigger that ndim, got %r" % batch_size - ) - if len(end) > 0: - shp_end = shape[-len(end) :] - else: - shp_end = np.asarray([]) - shp_begin = shape[: len(begin)] - slc_begin = [ - self.rslice(shp_begin[i], t[0], t[1]) if t is not None else at.arange(shp_begin[i]) - for i, t in enumerate(begin) - ] - slc_end = [ - self.rslice(shp_end[i], t[0], t[1]) if t is not None else at.arange(shp_end[i]) - for i, t in enumerate(end) - ] - slc = slc_begin + mid + slc_end - else: - raise TypeError("Unrecognized size type, %r" % batch_size) - return pm.pytensorf.ix_(*slc) - - def update_shared(self): - if self.update_shared_f is None: - raise NotImplementedError("No `update_shared_f` was provided to `__init__`") - self.set_value(np.asarray(self.update_shared_f(), self.dtype)) - - def set_value(self, value): - self.shared.set_value(np.asarray(value, self.dtype)) - - def clone(self): - ret = self.type() - ret.name = self.name - ret.tag = copy(self.tag) - return ret - - -def align_minibatches(batches=None): - if batches is None: - for rngs in Minibatch.RNG.values(): - for rng in rngs: - rng.seed() - else: - for b in batches: - if not isinstance(b, Minibatch): - raise TypeError(f"{b} is not a Minibatch") - for rng in Minibatch.RNG[id(b)]: - rng.seed() + result = tuple([v[slc] for v in (tensor, *tensors)]) + for i, r in enumerate(result): + r.name = f"minibatch.{i}" + return result if tensors else result[0] def determine_coords( diff --git a/pymc/distributions/logprob.py b/pymc/distributions/logprob.py index 62b21bb0e2..eb9dbb454d 100644 --- a/pymc/distributions/logprob.py +++ b/pymc/distributions/logprob.py @@ -102,7 +102,8 @@ def _get_scaling(total_size: TOTAL_SIZE, shape, ndim: int) -> TensorVariable: def _check_no_rvs(logp_terms: Sequence[TensorVariable]): # Raise if there are unexpected RandomVariables in the logp graph - # Only SimulatorRVs are allowed + # Only SimulatorRVs MinibatchIndexRVs are allowed + from pymc.data import MinibatchIndexRV from pymc.distributions.simulator import SimulatorRV unexpected_rv_nodes = [ @@ -111,7 +112,7 @@ def _check_no_rvs(logp_terms: Sequence[TensorVariable]): if ( node.owner and isinstance(node.owner.op, RandomVariable) - and not isinstance(node.owner.op, SimulatorRV) + and not isinstance(node.owner.op, (SimulatorRV, MinibatchIndexRV)) ) ] if unexpected_rv_nodes: diff --git a/pymc/model.py b/pymc/model.py index 2bb703f144..fa8b4fcbc5 100644 --- a/pymc/model.py +++ b/pymc/model.py @@ -50,7 +50,7 @@ from pytensor.tensor.var import TensorConstant, TensorVariable from pymc.blocking import DictToArrayBijection, RaveledVars -from pymc.data import GenTensorVariable, Minibatch +from pymc.data import GenTensorVariable, is_minibatch from pymc.distributions.logprob import _joint_logp from pymc.distributions.transforms import _default_transform from pymc.exceptions import ( @@ -1329,7 +1329,7 @@ def register_rv( else: if ( isinstance(observed, Variable) - and not isinstance(observed, (GenTensorVariable, Minibatch)) + and not isinstance(observed, GenTensorVariable) and observed.owner is not None # The only PyTensor operation we allow on observed data is type casting # Although we could allow for any graph that does not depend on other RVs @@ -1337,6 +1337,7 @@ def register_rv( isinstance(observed.owner.op, Elemwise) and isinstance(observed.owner.op.scalar_op, Cast) ) + and not is_minibatch(observed) ): raise TypeError( "Variables that depend on other nodes cannot be used for observed data." diff --git a/pymc/pytensorf.py b/pymc/pytensorf.py index ad20dfa2eb..d186f95f22 100644 --- a/pymc/pytensorf.py +++ b/pymc/pytensorf.py @@ -47,10 +47,10 @@ ) from pytensor.graph.fg import FunctionGraph from pytensor.graph.op import Op -from pytensor.sandbox.rng_mrg import MRG_RandomStream as RandomStream from pytensor.scalar.basic import Cast from pytensor.tensor.basic import _as_tensor_variable from pytensor.tensor.elemwise import Elemwise +from pytensor.tensor.random import RandomStream from pytensor.tensor.random.op import RandomVariable from pytensor.tensor.random.var import ( RandomGeneratorSharedVariable, diff --git a/pymc/tests/helpers.py b/pymc/tests/helpers.py index fc2994136c..ab0d7c7a4b 100644 --- a/pymc/tests/helpers.py +++ b/pymc/tests/helpers.py @@ -26,7 +26,7 @@ from pytensor.gradient import verify_grad as at_verify_grad from pytensor.graph import ancestors from pytensor.graph.rewriting.basic import in2out -from pytensor.sandbox.rng_mrg import MRG_RandomStream as RandomStream +from pytensor.tensor.random import RandomStream from pytensor.tensor.random.op import RandomVariable import pymc as pm diff --git a/pymc/tests/test_data.py b/pymc/tests/test_data.py index 0182aa80ac..538e1e1baa 100644 --- a/pymc/tests/test_data.py +++ b/pymc/tests/test_data.py @@ -27,6 +27,7 @@ import pymc as pm +from pymc.data import is_minibatch from pymc.pytensorf import GeneratorOp, floatX from pymc.tests.helpers import SeededTest, select_by_precision @@ -696,15 +697,10 @@ def test_common_errors(self): def test_mixed1(self): with pm.Model(): - data = np.random.rand(10, 20, 30, 40, 50) - mb = pm.Minibatch(data, [2, None, 20, Ellipsis, 10]) - pm.Normal("n", observed=mb, total_size=(10, None, 30, Ellipsis, 50)) - - def test_mixed2(self): - with pm.Model(): - data = np.random.rand(10, 20, 30, 40, 50) - mb = pm.Minibatch(data, [2, None, 20]) - pm.Normal("n", observed=mb, total_size=(10, None, 30)) + data = np.random.rand(10, 20) + mb = pm.Minibatch(data, batch_size=5) + v = pm.Normal("n", observed=mb, total_size=10) + assert pm.logp(v, 1) is not None, "Check index is allowed in graph" def test_free_rv(self): with pm.Model() as model4: @@ -719,51 +715,28 @@ def test_free_rv(self): @pytest.mark.usefixtures("strict_float32") class TestMinibatch: - data = np.random.rand(30, 10, 40, 10, 50) + data = np.random.rand(30, 10) def test_1d(self): - mb = pm.Minibatch(self.data, 20) - assert mb.eval().shape == (20, 10, 40, 10, 50) - - def test_2d(self): - mb = pm.Minibatch(self.data, [(10, 42), (4, 42)]) - assert mb.eval().shape == (10, 4, 40, 10, 50) - - @pytest.mark.parametrize( - "batch_size, expected", - [ - ([(10, 42), None, (4, 42)], (10, 10, 4, 10, 50)), - ([(10, 42), Ellipsis, (4, 42)], (10, 10, 40, 10, 4)), - ([(10, 42), None, Ellipsis, (4, 42)], (10, 10, 40, 10, 4)), - ([10, None, Ellipsis, (4, 42)], (10, 10, 40, 10, 4)), - ], - ) - def test_special_batch_size(self, batch_size, expected): - mb = pm.Minibatch(self.data, batch_size) - assert mb.eval().shape == expected - - def test_cloning_available(self): - gop = pm.Minibatch(np.arange(100), 1) - res = gop**2 - shared = pytensor.shared(np.array([10])) - res1 = pytensor.clone_replace(res, {gop: shared}) - f = pytensor.function([], res1) - assert f() == np.array([100]) - - def test_align(self): - m = pm.Minibatch(np.arange(1000), 1, random_seed=1) - n = pm.Minibatch(np.arange(1000), 1, random_seed=1) - f = pytensor.function([], [m, n]) - n.eval() # not aligned - a, b = zip(*(f() for _ in range(1000))) - assert a != b - pm.align_minibatches() - a, b = zip(*(f() for _ in range(1000))) - assert a == b - n.eval() # not aligned - pm.align_minibatches([m]) - a, b = zip(*(f() for _ in range(1000))) - assert a != b - pm.align_minibatches([m, n]) - a, b = zip(*(f() for _ in range(1000))) - assert a == b + mb = pm.Minibatch(self.data, batch_size=20) + assert is_minibatch(mb) + assert mb.eval().shape == (20, 10) + + def test_allowed(self): + mb = pm.Minibatch(at.as_tensor(self.data).astype(int), batch_size=20) + assert is_minibatch(mb) + + def test_not_allowed(self): + with pytest.raises(ValueError, match="not valid for Minibatch"): + mb = pm.Minibatch(at.as_tensor(self.data) * 2, batch_size=20) + + def test_not_allowed2(self): + with pytest.raises(ValueError, match="not valid for Minibatch"): + mb = pm.Minibatch(self.data, at.as_tensor(self.data) * 2, batch_size=20) + + def test_assert(self): + with pytest.raises( + AssertionError, match=r"All variables shape\[0\] in Minibatch should be equal" + ): + d1, d2 = pm.Minibatch(self.data, self.data[::2], batch_size=20) + d1.eval() diff --git a/pymc/tests/variational/test_approximations.py b/pymc/tests/variational/test_approximations.py index 3048444ba3..796f7d3f86 100644 --- a/pymc/tests/variational/test_approximations.py +++ b/pymc/tests/variational/test_approximations.py @@ -168,3 +168,15 @@ def test_elbo_beta_kl(aux_total_size): np.testing.assert_allclose( elbo_via_total_size_scaled.eval(), elbo_via_beta_kl.eval(), rtol=0, atol=1e-1 ) + + +def test_seeding_advi_fit(): + with pm.Model(): + x = pm.Normal("x", 0, 10, initval="prior") + approx1 = pm.fit( + random_seed=42, n=10, method="advi", obj_optimizer=pm.adagrad_window, progressbar=False + ) + approx2 = pm.fit( + random_seed=42, n=10, method="advi", obj_optimizer=pm.adagrad_window, progressbar=False + ) + np.testing.assert_allclose(approx1.mean.eval(), approx2.mean.eval()) diff --git a/pymc/tests/variational/test_inference.py b/pymc/tests/variational/test_inference.py index 6e668c8a79..238b0ea091 100644 --- a/pymc/tests/variational/test_inference.py +++ b/pymc/tests/variational/test_inference.py @@ -15,6 +15,7 @@ import io import operator +import cloudpickle import numpy as np import pytensor import pytensor.tensor as at @@ -26,6 +27,7 @@ from pymc.pytensorf import intX from pymc.variational.inference import ADVI, ASVGD, SVGD, FullRankADVI +from pymc.variational.opvi import NotImplementedInference pytestmark = pytest.mark.usefixtures("strict_float32", "seeded_test") @@ -60,7 +62,7 @@ def simple_model_data(use_minibatch): d = n / sigma**2 + 1 / sigma0**2 mu_post = (n * np.mean(data) / sigma**2 + mu0 / sigma0**2) / d if use_minibatch: - data = pm.Minibatch(data) + data = pm.Minibatch(data, batch_size=128) return dict( n=n, data=data, @@ -118,7 +120,7 @@ def init_(**kw): @pytest.fixture(scope="function") def inference(inference_spec, simple_model): with simple_model: - return inference_spec() + return inference_spec(random_seed=42) @pytest.fixture(scope="function") @@ -129,7 +131,7 @@ def fit_kwargs(inference, use_minibatch): obj_optimizer=pm.adagrad_window(learning_rate=0.01, n_win=50), n=12000 ), (FullRankADVI, "full"): dict( - obj_optimizer=pm.adagrad_window(learning_rate=0.01, n_win=50), n=6000 + obj_optimizer=pm.adagrad_window(learning_rate=0.015, n_win=50), n=6000 ), (FullRankADVI, "mini"): dict( obj_optimizer=pm.adagrad_window(learning_rate=0.007, n_win=50), n=12000 @@ -149,6 +151,8 @@ def fit_kwargs(inference, use_minibatch): inference.approx.scale_cost_to_minibatch = False else: key = "full" + if (type(inference), key) in {(SVGD, "mini"), (ASVGD, "mini")}: + pytest.skip("Not Implemented Inference") return _select[(type(inference), key)] @@ -179,7 +183,10 @@ def test_fit_start(inference_spec, simple_model): with simple_model: inference = inference_spec(**kw) - trace = inference.fit(n=0).sample(10000) + try: + trace = inference.fit(n=0).sample(10000) + except NotImplementedInference as e: + pytest.skip(str(e)) np.testing.assert_allclose(np.mean(trace.posterior["mu"]), mu_init, rtol=0.05) if has_start_sigma: np.testing.assert_allclose(np.std(trace.posterior["mu"]), mu_sigma_init, rtol=0.05) @@ -218,6 +225,8 @@ def test_fit_fn_text(method, kwargs, error): def test_profile(inference): + if type(inference) in {SVGD, ASVGD}: + pytest.skip("Not Implemented Inference") inference.run_profiling(n=100).summary() @@ -239,8 +248,7 @@ def binomial_model_inference(binomial_model, inference_spec): @pytest.mark.xfail("pytensor.config.warn_float64 == 'raise'", reason="too strict float32") def test_replacements(binomial_model_inference): - d = at.bscalar() - d.tag.test_value = 1 + d = pytensor.shared(1) approx = binomial_model_inference.approx p = approx.model.p p_t = p**3 @@ -252,7 +260,7 @@ def test_replacements(binomial_model_inference): ), "p should be replaced" if pytensor.config.compute_test_value != "off": assert p_s.tag.test_value.shape == p_t.tag.test_value.shape - sampled = [p_s.eval() for _ in range(100)] + sampled = [pm.draw(p_s) for _ in range(100)] assert any(map(operator.ne, sampled[1:], sampled[:-1])) # stochastic p_z = approx.sample_node(p_t, deterministic=False, size=10) assert p_z.shape.eval() == (10,) @@ -264,15 +272,17 @@ def test_replacements(binomial_model_inference): try: p_d = approx.sample_node(p_t, deterministic=True) - sampled = [p_d.eval() for _ in range(100)] + sampled = [pm.draw(p_d) for _ in range(100)] assert all(map(operator.eq, sampled[1:], sampled[:-1])) # deterministic except opvi.NotImplementedInference: pass p_r = approx.sample_node(p_t, deterministic=d) - sampled = [p_r.eval({d: 1}) for _ in range(100)] + d.set_value(1) + sampled = [pm.draw(p_r) for _ in range(100)] assert all(map(operator.eq, sampled[1:], sampled[:-1])) # deterministic - sampled = [p_r.eval({d: 0}) for _ in range(100)] + d.set_value(0) + sampled = [pm.draw(p_r) for _ in range(100)] assert any(map(operator.ne, sampled[1:], sampled[:-1])) # stochastic @@ -325,8 +335,6 @@ def test_var_replacement(): def test_clear_cache(): - import cloudpickle - with pm.Model(): pm.Normal("n", 0, 1) inference = ADVI() diff --git a/pymc/variational/approximations.py b/pymc/variational/approximations.py index b615e804e1..9ebc02ef40 100644 --- a/pymc/variational/approximations.py +++ b/pymc/variational/approximations.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + import numpy as np import pytensor @@ -24,11 +25,13 @@ from pymc.blocking import DictToArrayBijection from pymc.distributions.dist_math import rho2sigma +from pymc.pytensorf import makeiter from pymc.variational import opvi from pymc.variational.opvi import ( Approximation, Group, NotImplementedInference, + _known_scan_ignored_inputs, node_property, ) @@ -248,9 +251,12 @@ def randidx(self, size=None): pass else: size = tuple(np.atleast_1d(size)) - return self._rng.uniform( - size=size, low=pm.floatX(0), high=pm.floatX(self.histogram.shape[0]) - pm.floatX(1e-16) - ).astype("int32") + return at.random.integers( + size=size, + low=0, + high=self.histogram.shape[0], + rng=pytensor.shared(np.random.default_rng()), + ) def _new_initial(self, size, deterministic, more_replacements=None): pytensor_condition_is_here = isinstance(deterministic, Variable) @@ -383,8 +389,10 @@ def evaluate_over_trace(self, node): """ node = self.to_flat_input(node) - def sample(post, node): + def sample(post, *_): return pytensor.clone_replace(node, {self.input: post}) - nodes, _ = pytensor.scan(sample, self.histogram, non_sequences=[node]) + nodes, _ = pytensor.scan( + sample, self.histogram, non_sequences=_known_scan_ignored_inputs(makeiter(node)) + ) return nodes diff --git a/pymc/variational/inference.py b/pymc/variational/inference.py index ea2868d10a..90bee1623c 100644 --- a/pymc/variational/inference.py +++ b/pymc/variational/inference.py @@ -631,7 +631,9 @@ def __init__(self, approx=None, estimator=KSD, kernel=test_functions.rbf, **kwar "is often **underestimated** when using temperature = 1." ) if approx is None: - approx = FullRank(model=kwargs.pop("model", None)) + approx = FullRank( + model=kwargs.pop("model", None), random_seed=kwargs.pop("random_seed", None) + ) super().__init__(estimator=estimator, approx=approx, kernel=kernel, **kwargs) def fit( diff --git a/pymc/variational/operators.py b/pymc/variational/operators.py index 7464cb31de..ff3a416969 100644 --- a/pymc/variational/operators.py +++ b/pymc/variational/operators.py @@ -20,7 +20,12 @@ import pymc as pm from pymc.variational import opvi -from pymc.variational.opvi import ObjectiveFunction, Operator +from pymc.variational.opvi import ( + NotImplementedInference, + ObjectiveFunction, + Operator, + _known_scan_ignored_inputs, +) from pymc.variational.stein import Stein __all__ = ["KL", "KSD"] @@ -136,6 +141,10 @@ def __init__(self, approx, temperature=1): def apply(self, f): # f: kernel function for KSD f(histogram) -> (k(x,.), \nabla_x k(x,.)) + if _known_scan_ignored_inputs([self.approx.model.logp()]): + raise NotImplementedInference( + "SVGD does not currently support Minibatch or Simulator RV" + ) stein = Stein( approx=self.approx, kernel=f, diff --git a/pymc/variational/opvi.py b/pymc/variational/opvi.py index a76e2b8b78..018d65b010 100644 --- a/pymc/variational/opvi.py +++ b/pymc/variational/opvi.py @@ -67,10 +67,10 @@ from pymc.model import modelcontext from pymc.pytensorf import ( SeedSequenceSeed, - at_rng, compile_pymc, find_rng_nodes, identity, + makeiter, reseed_rngs, ) from pymc.util import ( @@ -109,6 +109,18 @@ class GroupError(VariationalInferenceError, TypeError): """Error related to VI groups""" +def _known_scan_ignored_inputs(terms): + # TODO: remove when scan issue with grads is fixed + from pymc.data import MinibatchIndexRV + from pymc.distributions.simulator import SimulatorRV + + return [ + n.owner.inputs[0] + for n in pytensor.graph.ancestors(terms) + if n.owner is not None and isinstance(n.owner.op, (MinibatchIndexRV, SimulatorRV)) + ] + + def append_name(name): def wrap(f): if name is None: @@ -370,10 +382,11 @@ def step_function( more_replacements=more_replacements, total_grad_norm_constraint=total_grad_norm_constraint, ) + seed = self.approx.rng.randint(2**30, dtype=np.int64) if score: - step_fn = compile_pymc([], updates.loss, updates=updates, **fn_kwargs) + step_fn = compile_pymc([], updates.loss, updates=updates, random_seed=seed, **fn_kwargs) else: - step_fn = compile_pymc([], [], updates=updates, **fn_kwargs) + step_fn = compile_pymc([], [], updates=updates, random_seed=seed, **fn_kwargs) return step_fn @pytensor.config.change_flags(compute_test_value="off") @@ -402,7 +415,8 @@ def score_function( if more_replacements is None: more_replacements = {} loss = self(sc_n_mc, more_replacements=more_replacements) - return compile_pymc([], loss, **fn_kwargs) + seed = self.approx.rng.randint(2**30, dtype=np.int64) + return compile_pymc([], loss, random_seed=seed, **fn_kwargs) @pytensor.config.change_flags(compute_test_value="off") def __call__(self, nmc, **kwargs): @@ -725,8 +739,7 @@ def __init__( options = dict() self.options = options self._vfam = vfam - self.rng = np.random.default_rng(random_seed) - self._rng = at_rng(random_seed) + self.rng = np.random.RandomState(random_seed) model = modelcontext(model) self.model = model self.group = group @@ -747,7 +760,7 @@ def _prepare_start(self, start=None): jitter_rvs={}, return_transformed=True, ) - start = ipfn(self.rng.integers(2**30, dtype=np.int64)) + start = ipfn(self.rng.randint(2**30, dtype=np.int64)) group_vars = {self.model.rvs_to_values[v].name for v in self.group} start = {k: v for k, v in start.items() if k in group_vars} start = DictToArrayBijection.map(start).data @@ -943,9 +956,9 @@ def _new_initial(self, size, deterministic, more_replacements=None): if deterministic: return at.ones(shape, dtype) * dist_map else: - return getattr(self._rng, dist_name)(size=shape) + return getattr(at.random, dist_name)(size=shape) else: - sample = getattr(self._rng, dist_name)(size=shape) + sample = getattr(at.random, dist_name)(size=shape) initial = at.switch(deterministic, at.ones(shape, dtype) * dist_map, sample) return initial @@ -985,9 +998,14 @@ def set_size_and_deterministic( ------- :class:`Variable` with applied replacements, ready to use """ + flat2rand = self.make_size_and_deterministic_replacements(s, d, more_replacements) node_out = pytensor.clone_replace(node, flat2rand) + assert not ( + set(makeiter(self.input)) & set(pytensor.graph.graph_inputs(makeiter(node_out))) + ) try_to_set_test_value(node, node_out, s) + assert self.symbolic_random not in set(pytensor.graph.graph_inputs(makeiter(node_out))) return node_out def to_flat_input(self, node): @@ -1002,10 +1020,13 @@ def symbolic_sample_over_posterior(self, node): random = self.symbolic_random.astype(self.symbolic_initial.dtype) random = at.specify_shape(random, self.symbolic_initial.type.shape) - def sample(post, node): + def sample(post, *_): return pytensor.clone_replace(node, {self.input: post}) - nodes, _ = pytensor.scan(sample, random, non_sequences=[node]) + nodes, _ = pytensor.scan( + sample, random, non_sequences=_known_scan_ignored_inputs(makeiter(random)) + ) + assert self.input not in set(pytensor.graph.graph_inputs(makeiter(nodes))) return nodes def symbolic_single_sample(self, node): @@ -1343,6 +1364,7 @@ def set_size_and_deterministic(self, node, s, d, more_replacements=None): flat2rand = self.make_size_and_deterministic_replacements(s, d, more_replacements) node = pytensor.clone_replace(node, optimizations) node = pytensor.clone_replace(node, flat2rand) + assert not (set(self.symbolic_randoms) & set(pytensor.graph.graph_inputs(makeiter(node)))) try_to_set_test_value(_node, node, s) return node @@ -1356,12 +1378,15 @@ def symbolic_sample_over_posterior(self, node, more_replacements=None): """*Dev* - performs sampling of node applying independent samples from posterior each time. Note that it is done symbolically and this node needs :func:`set_size_and_deterministic` call """ - node = self.to_flat_input(node, more_replacements=more_replacements) + node = self.to_flat_input(node) def sample(*post): return pytensor.clone_replace(node, dict(zip(self.inputs, post))) - nodes, _ = pytensor.scan(sample, self.symbolic_randoms) + nodes, _ = pytensor.scan( + sample, self.symbolic_randoms, non_sequences=_known_scan_ignored_inputs(makeiter(node)) + ) + assert not (set(self.inputs) & set(pytensor.graph.graph_inputs(makeiter(nodes)))) return nodes def symbolic_single_sample(self, node, more_replacements=None): diff --git a/pymc/variational/stein.py b/pymc/variational/stein.py index 5d36a074ea..a02fe0f0a0 100644 --- a/pymc/variational/stein.py +++ b/pymc/variational/stein.py @@ -46,7 +46,8 @@ def approx_symbolic_matrices(self): @node_property def dlogp(self): - grad = at.grad(self.logp_norm.sum(), self.approx_symbolic_matrices) + logp = self.logp_norm.sum() + grad = at.grad(logp, self.approx_symbolic_matrices) def flatten2(tensor): return tensor.flatten(2)