Source code for occuspytial.gibbs.probit

import numpy as np
from numpy.linalg import multi_dot
from scipy.linalg import solve_triangular
from scipy.special import ndtr, ndtri  # std norm cdf and its inverse

from ..distributions import precision_mvnorm

from .base import GibbsBase


def truncnorm_inf_ppf(a, p):
    """Return PPF from right hand tail of truncated normal distribution.

    Return values from the inteval (a, np.inf) for small |a|.
    """
    return -ndtri(ndtr(-a) * (1.0 - p))


def truncnorm_neginf_ppf(b, p):
    """Return PPF from left hand tail of truncated normal distribution.

    Return values from the inteval (-np.inf, b) for small |b|.
    """
    return ndtri(ndtr(b) * p)


[docs]class ProbitRSRGibbs(GibbsBase): r"""Sampler using probit link and RSR model for spatial random effects. This algorithm is an implementation of the gibbs sampler in [1]_ where a Reduced Spatial Regression (RSR) model is used to account for spatial autocorrelation in a single-season site occupancy model, but a probit link function is used instead of a logit. Parameters ---------- Q : np.ndarray Spatial precision matrix of spatial random effects. W : Dict[int, np.ndarray] Dictionary of detection corariates where the keys are the site numbers of the surveyed sites and the values are arrays containing the design matrix of each corresponding site. X : np.ndarray Design matrix of species occupancy covariates. y : Dict[int, np.ndarray] Dictionary of survey data where the keys are the site numbers of the surveyed sites and the values are number arrays of 1's and 0's where 0's indicate "no detection" and 1's indicate "detection". The length of each array equals the number of visits in the corresponding site. hparams : {None, Dict[str, Union[float, np.ndarray]}, optional Hyperparameters of the occupancy model. valid keys for the dictionary are: - ``a_mu``: mean of the normal prior of detection covariates. - ``a_prec``: precision matrix of the normal prior of detection covariates. - ``b_mu``: mean of the normal prior of occupancy covariates. - ``b_prec``: precision matrix of the normal prior of occupancy covariates. - ``tau_rate``: rate parameter of the Gamma prior of the spatial parameter. - ``tau_shape``: shape parameter of the Gamma prior of the spatial parameter. random_state : {None, int, numpy.random.SeedSequence} A seed to initialize the bitgenerator. r : float, optional The threshold of non-negative eigenvalues to keep of the Moran matrix to form the RSR precision matrix. Defaults to 0.5, meaning only columns of the Moran matrix that have corresponding eigenvalues greater than 0.5 will be used. If `q` is set, then this parameter is ignored. q : int, optional The number of columns of the Moran matrix to use in order to form the spatial precision matrix of the RSR model. If this parameter is used, then the value of the `r` parameter is ignore. If the value is None, then the default value of `r` is used to create the spatial precision matrix of the RSR model. Defaults to None. Methods ------- sample(size, burnin=0, start=None, chains=1, progressbar=True) See Also -------- occuspytial.gibbs.probit.LogitRSRGibbs : The RSR gibbs sampler using a Logit link function. Notes ----- When the assumption that the random effect :math:`\\epsilon` is normally distributed fails to hold, then a functional form misspecification issue arises: if the model is still estimated as a probit model, the estimators of the coefficients :math:`\\beta` are inconsistent [1]_. References ---------- .. [1] Joachim Inkmann(2000). Misspecified heteroskedasticity in the panel probit model: A small sample comparison of GMM and SML estimators. Journal of Econometrics, 97(2), 227-259 """ def __init__( self, Q, W, X, y, hparams=None, random_state=None, r=0.5, q=None ): super().__init__(Q, W, X, y, hparams, random_state) self._configure(Q, hparams, q, r) def _configure(self, Q, hparams, q, r): super()._configure(Q, hparams) self.state.omega_b = np.zeros(self.fixed.n) self.fixed.XTX_plus_bprec = self.X.T @ self.X + self.fixed.b_prec self.fixed.eps_chol_factor = np.ones(self.X.shape[0]) / np.sqrt(2) # XTX_i = inv(self.X.T @ self.X) chol = np.linalg.cholesky(self.X.T @ self.X) z = solve_triangular(chol, np.eye(self.X.shape[1]), lower=True) XTX_i = solve_triangular(chol, z, lower=True, trans=1) # P = I - X @ XTX_i @ XT P = -multi_dot([self.X, XTX_i, self.X.T]) P[np.diag_indices_from(P)] += 1 A = self.fixed.Q.copy() A.data = -A.data A.setdiag(0) omega = self.fixed.n * (P.T @ A @ P) / A.sum() w, v = np.linalg.eigh(omega) if q: self.fixed.q = q else: if not 0 <= r <= 1: raise ValueError('Threshold value needs to be in [0, 1]') # number of eigenvalues > threshold self.fixed.q = w[w >= r].size if not self.fixed.q: raise ValueError( 'The Moran Operator Matrix of the data has no positive ' 'eigenvalues. Set threshold to a lower value' ) # keep first q eigenvectors of ordered eigens K = v[:, -self.fixed.q:] self.fixed.KTK = K.T @ K # replace Q with Minv Q_copy = self.fixed.Q del self.fixed.Q self.fixed.Q = K.T @ Q_copy @ K self.fixed.K = K if not hparams: # `_set_default_hyperparams` has been called so modify tau_shape del self.fixed.tau_shape self.fixed.tau_shape = 0.5 + 0.5 * self.fixed.q def _initialize_default_start(self, state): state = super()._initialize_default_start(state) state.eta = self.rng.normal(scale=5, size=self.fixed.q) state.spatial = self.fixed.K @ self.state.eta state.eps = self.rng.standard_normal(self.fixed.n) return state def _initialize_posterior_state(self, start=None): if start is None: self.state = self._initialize_default_start(self.state) else: self.state.alpha = start['alpha'] self.state.beta = start['beta'] self.state.tau = start['tau'] self.state.eta = start['eta'] self.state.eps = start['eps'] self.state.spatial = self.fixed.K @ self.state.eta def _update_omega_a(self): """Update the latent variable ``omega_a``. This variable is ssociated with the cofficients of the conditional detection covariates. """ # get occopancy state of the sites where species was not observed. not_obs_occupancy = [i for i in self.fixed.not_obs if self.state.z[i]] self.state.exists = self.fixed.obs + not_obs_occupancy obs_mask = (self.y[self.state.exists] == 1) self.state.W = self.W[self.state.exists] loc = self.state.W @ self.state.alpha self.state.omega_a = np.zeros_like(loc) # sample from truncated normal distribution in the intervals (0, inf) # and (-inf, 0) using the inverse transform method # source: https://github.com/scipy/scipy/issues/12370 a = loc[obs_mask] b = loc[~obs_mask] Ua = self.rng.random(size=a.shape[0]) self.state.omega_a[obs_mask] = truncnorm_inf_ppf(-a, Ua) + a Ub = self.rng.random(size=b.shape[0]) self.state.omega_a[~obs_mask] = truncnorm_neginf_ppf(-b, Ub) + b def _update_omega_b(self): """Update the latent variable ``omega_b``. This variable is associated with the cofficients of the occupancy covariates. """ loc = self.X @ self.state.beta + self.state.spatial + self.state.eps exist_mask = (self.state.z == 1) a = loc[exist_mask] b = loc[~exist_mask] Ua = self.rng.random(size=a.shape[0]) self.state.omega_b[exist_mask] = truncnorm_inf_ppf(-a, Ua) + a Ub = self.rng.random(size=b.shape[0]) self.state.omega_b[~exist_mask] = truncnorm_neginf_ppf(-b, Ub) + b def _update_tau(self): eta = self.state.eta rate = 0.5 * eta @ self.fixed.Q @ eta + self.fixed.tau_rate self.state.tau = self.rng.gamma(self.fixed.tau_shape, 1 / rate) def _update_eps(self): mean = 0.5 * ( self.state.omega_b - self.X @ self.state.beta - self.state.spatial ) std = self.rng.standard_normal(mean.shape[0]) self.state.eps = mean + self.fixed.eps_chol_factor * std def _update_eta(self): K = self.fixed.K eps = self.state.eps A = self.fixed.KTK + self.state.tau * self.fixed.Q b = K.T @ (self.state.omega_b - self.X @ self.state.beta - eps) self.state.eta = precision_mvnorm(b, A, self.rng) self.state.spatial = self.fixed.K @ self.state.eta def _update_alpha(self): WT = self.state.W.T A = WT @ WT.T + self.fixed.a_prec b = self.fixed.a_prec_by_mu + WT @ self.state.omega_a self.state.alpha = precision_mvnorm(b, A, self.rng) def _update_beta(self): b = self.fixed.b_prec_by_mu + self.X.T @ ( self.state.omega_b - self.state.spatial - self.state.eps ) self.state.beta = precision_mvnorm( b, self.fixed.XTX_plus_bprec, self.rng ) def _update_z(self): no = self.fixed.not_obs ns = self.fixed.not_surveyed beta = self.state.beta K_eta = self.state.spatial num1 = ndtr(self.X[no] @ beta + K_eta[no] + self.state.eps[no]) num2 = 1 - ndtr(self.fixed.W_not_obs @ self.state.alpha) stack_prod = np.multiply.reduceat(num2, self.fixed.stacked_w_indices) num = num1 * stack_prod p = num / ((1 - num1) + num) self.state.z[no] = self.rng.uniform(size=self.fixed.n_no) < p if ns: p = ndtr(self.X[ns] @ beta + K_eta[ns] + self.state.eps[ns]) self.state.z[ns] = self.rng.uniform(size=self.fixed.n_ns) < p
[docs] def step(self): self._update_omega_b() self._update_tau() self._update_eps() self._update_eta() self._update_beta() self._update_omega_a() self._update_alpha() self._update_z()