Coverage for HARK / distributions / multivariate.py: 93%
175 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-10 06:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-10 06:19 +0000
1from itertools import product
2from typing import List, Union
4import numpy as np
5from numpy import linalg
6from scipy import special
7from scipy.stats._multivariate import multi_rv_frozen, multivariate_normal_frozen
9from HARK.distributions.base import Distribution
10from HARK.distributions.continuous import Lognormal, Normal
11from HARK.distributions.discrete import DiscreteDistribution
13# MULTIVARIATE DISTRIBUTIONS
16class MultivariateNormal(multivariate_normal_frozen, Distribution):
17 """
18 A Multivariate Normal distribution.
20 Parameters
21 ----------
22 mu : numpy array
23 Mean vector.
24 Sigma : 2-d numpy array. Each dimension must have length equal to that of
25 mu.
26 Variance-covariance matrix.
27 seed : int
28 Seed for random number generator.
29 """
31 def __init__(self, mu=[1, 1], Sigma=[[1, 0], [0, 1]], seed=None):
32 self.mu = np.asarray(mu)
33 self.Sigma = np.asarray(Sigma)
34 self.M = self.mu.size
35 multivariate_normal_frozen.__init__(self, mean=self.mu, cov=self.Sigma)
36 Distribution.__init__(self, seed=seed)
37 self.infimum = -np.inf * np.ones(self.M)
38 self.supremum = np.inf * np.ones(self.M)
40 def discretize(self, N, method="hermite", endpoints=False):
41 """
42 For multivariate normal distributions, the Gauss-Hermite
43 quadrature rule is used as the default method for discretization.
44 """
46 return self._approx(N, method=method, endpoints=endpoints)
48 def _approx(self, N, method="hermite", endpoints=False):
49 """
50 Returns a discrete approximation of this distribution.
52 The discretization will have N**M points, where M is the dimension of
53 the multivariate normal.
55 It uses the fact that:
56 - Being positive definite, Sigma can be factorized as Sigma = QVQ',
57 with V diagonal. So, letting A=Q*sqrt(V), Sigma = A*A'.
58 - If Z is an N-dimensional multivariate standard normal, then
59 A*Z ~ N(0,A*A' = Sigma).
61 The idea therefore is to construct an equiprobable grid for a standard
62 normal and multiply it by matrix A.
63 """
65 # Start by computing matrix A.
66 v, Q = np.linalg.eig(self.Sigma)
67 sqrtV = np.diag(np.sqrt(v))
68 A = np.matmul(Q, sqrtV)
70 # Now find a discretization for a univariate standard normal.
72 z_approx = Normal().discretize(N, method=method)
74 # Now create the multivariate grid and pmv
75 Z = np.array(list(product(*[z_approx.atoms.flatten()] * self.M)))
76 pmv = np.prod(np.array(list(product(*[z_approx.pmv] * self.M))), axis=1)
78 # Apply mean and standard deviation to the Z grid
79 atoms = self.mu[None, ...] + np.matmul(Z, A.T)
81 limit = {
82 "dist": self,
83 "method": method,
84 "N": N,
85 "endpoints": endpoints,
86 }
88 # Construct and return discrete distribution
89 return DiscreteDistribution(
90 pmv,
91 atoms.T,
92 seed=self.random_seed(),
93 limit=limit,
94 )
97class MultivariateLogNormal(multi_rv_frozen, Distribution):
98 """
99 A Multivariate Lognormal distribution.
101 Parameters
102 ----------
103 mu : Union[list, numpy.ndarray], optional
104 Means of underlying multivariate normal, default [0.0, 0.0].
105 Sigma : Union[list, numpy.ndarray], optional
106 nxn variance-covariance matrix of underlying multivariate normal, default [[1.0, 0.0], [0.0, 1.0]].
107 seed : int, optional
108 Seed for random number generator, default 0.
109 """
111 def __init__(
112 self,
113 mu: Union[List, np.ndarray] = [0.0, 0.0],
114 Sigma: Union[List, np.ndarray] = [[1.0, 0.0], [0.0, 1.0]],
115 seed=None,
116 ):
117 self.mu = np.asarray(mu)
118 self.Sigma = np.asarray(Sigma)
119 self.M = self.mu.size
121 if self.Sigma.shape != (self.M, self.M):
122 raise AttributeError(f"Sigma must be a {self.M}x{self.M} matrix")
124 if np.array_equal(self.Sigma, self.Sigma.T):
125 res = np.all(np.linalg.eigvals(self.Sigma) >= 0)
126 if not res:
127 raise AttributeError("Sigma must be positive semi-definite")
128 else:
129 raise AttributeError("Sigma must be positive semi-definite")
131 multi_rv_frozen.__init__(self)
132 Distribution.__init__(self, seed=seed)
133 self.dstn = MultivariateNormal(mu=self.mu, Sigma=self.Sigma)
135 def mean(self):
136 """
137 Mean of the distribution.
139 Returns
140 -------
141 np.ndarray
142 Mean of the distribution.
143 """
145 return np.exp(self.mu + 0.5 * np.diag(self.Sigma))
147 def _cdf(self, x: Union[list, np.ndarray]):
148 """
149 Cumulative distribution function of the distribution evaluated at x.
151 Parameters
152 ----------
153 x : np.ndarray
154 Point at which to evaluate the CDF.
156 Returns
157 -------
158 float
159 CDF evaluated at x.
160 """
162 x = np.asarray(x)
163 if (x.shape != self.M) & (x.shape[1] != self.M):
164 raise ValueError(f"x must be and {self.M}-dimensional input")
165 return self.dstn.cdf(np.log(x))
167 def _pdf(self, x: Union[list, np.ndarray]):
168 """
169 Probability density function of the distribution evaluated at x.
171 Parameters
172 ----------
173 x : Union[list, np.ndarray]
174 Point at which to evaluate the PDF.
176 Returns
177 -------
178 float
179 PDF evaluated at x.
180 """
182 x = np.asarray(x)
184 if (x.shape != (self.M,)) & (x.shape[1] != self.M):
185 raise ValueError(f"x must be an {self.M}-dimensional input")
187 eigenvalues = linalg.eigvals(self.Sigma)
189 pseudo_det = np.prod(eigenvalues[eigenvalues > 1e-12])
191 inverse_sigma = linalg.pinv(self.Sigma)
193 rank_sigma = linalg.matrix_rank(self.Sigma)
195 pd = np.multiply(
196 (1 / np.prod(x, axis=1, keepdims=True)),
197 (2 * np.pi) ** (-rank_sigma / 2)
198 * pseudo_det ** (-0.5)
199 * np.exp(-(1 / 2) * np.multiply(np.log(x) @ inverse_sigma, np.log(x))),
200 )
201 return pd
203 def _marginal(self, x: Union[np.ndarray, float, list], dim: int):
204 """
205 Marginal distribution of one of the variables in the bivariate distribution evaluated at x.
207 Parameters
208 ----------
209 x : Union[np.ndarray, float]
210 Point at which to evaluate the marginal distribution.
211 dim : int
212 Which of the random variables to evaluate.
214 Returns
215 -------
216 float
217 Marginal distribution evaluated at x.
218 """
220 x = np.asarray(x)
221 x_dim = Lognormal(mu=self.mu[dim], sigma=np.sqrt(self.Sigma[dim, dim]))
222 return x_dim.pdf(x)
224 def _marginal_cdf(self, x: Union[np.ndarray, float, list], dim: int):
225 """
226 Cumulative distribution function of one of the variables from the bivariate distribution evaluated at x.
228 Parameters
229 ----------
230 x : Union[np.ndarray, float]
231 Point at which to evaluate the marginal CDF.
232 dim : int
233 Which of the random variables to evaluate.
235 Returns
236 -------
237 float
238 Marginal CDF evaluated at x.
239 """
241 x = np.asarray(x)
242 x_dim = Lognormal(mu=self.mu[dim], sigma=np.sqrt(self.Sigma[dim, dim]))
243 return x_dim.cdf(x)
245 def rvs(self, size: int = 1, random_state=None):
246 """
247 Random sample from the distribution.
249 Parameters
250 ----------
251 size : int
252 Number of data points to generate.
253 random_state : optional
254 Seed for random number generator.
256 Returns
257 -------
258 np.ndarray
259 Random sample from the distribution.
260 """
261 return np.exp(self.dstn.rvs(size, random_state=random_state))
263 @staticmethod
264 def _make_inners(N: int, tail_N: int) -> np.ndarray:
265 """Build the per-dimension ``inners`` index vector used for tail tagging."""
266 inners = np.zeros(N + 2 * tail_N)
267 if tail_N > 0:
268 inners[:tail_N] = [(tail_N - i) for i in range(tail_N)]
269 inners[-tail_N:] = [(i + 1) for i in range(tail_N)]
270 return inners
272 def _fill_interiors_row(
273 self,
274 interiors: np.ndarray,
275 inners: np.ndarray,
276 N: int,
277 tail_N: int,
278 i: int,
279 ) -> None:
280 """Populate ``interiors[i]`` by tiling ``inners`` for dimension ``i``."""
281 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)]
282 interiors[i] = np.repeat([*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1)))
284 def _diagonal_atoms(self, N, tail_N, tail_bound, endpoints):
285 """Build the atoms tensor for the case where Sigma is diagonal."""
286 ind_atoms = np.empty((self.M, N + 2 * tail_N))
287 for i in range(self.M):
288 if self.Sigma[i, i] == 0.0:
289 ind_atoms[i] = np.repeat(np.exp(self.mu[i]), N + 2 * tail_N)
290 else:
291 ind_atoms[i] = (
292 Lognormal(mu=self.mu[i], sigma=np.sqrt(self.Sigma[i, i]))
293 ._approx_equiprobable(
294 N, tail_N=tail_N, tail_bound=tail_bound, endpoints=endpoints
295 )
296 .atoms
297 )
298 atoms_list = [ind_atoms[i] for i in range(self.M)]
299 return np.stack(
300 [ar.flatten() for ar in list(np.meshgrid(*atoms_list))], axis=1
301 ).T
303 @staticmethod
304 def _compute_z_bins(N, tail_N, tail_bound):
305 """Return ``(z_bins, int_prob, tail_bound)`` shared by Cholesky/eig decomposition branches.
307 ``tail_bound`` is returned in normalized list form so that the caller
308 can store the consistent value in ``limit`` metadata.
309 """
310 if tail_bound is not None:
311 if isinstance(tail_bound, (float, np.floating)):
312 tail_bound = [tail_bound, 1 - tail_bound]
313 if tail_bound[0] < 0 or tail_bound[1] > 1:
314 raise ValueError("Tail bounds must be between 0 and 1")
315 cdf_cuts = np.linspace(tail_bound[0], tail_bound[1], N + 1)
316 int_prob = tail_bound[1] - tail_bound[0]
317 else:
318 cdf_cuts = np.linspace(0, 1, N + 1)
319 int_prob = 1.0
321 Z = Normal()
322 z_cuts = np.empty(2 * tail_N + N + 1)
323 if tail_N > 0:
324 z_cuts[0:tail_N] = Z.ppf(cdf_cuts[0])
325 z_cuts[-tail_N:] = Z.ppf(cdf_cuts[-1])
326 z_cuts[tail_N : tail_N + N + 1] = Z.ppf(cdf_cuts)
327 z_bins = [(z_cuts[i], z_cuts[i + 1]) for i in range(N + 2 * tail_N)]
328 return z_bins, int_prob, tail_bound
330 @staticmethod
331 def _eval_atom_density(params, z, N, int_prob):
332 """Compute the integral of the lognormal density over a multidimensional z bin."""
333 inds = []
334 excl = []
335 for j in range(len(z)):
336 if z[j, 0] == z[j, 1]:
337 excl.append(j)
338 elif params[j] != 0.0:
339 inds.append(j)
341 dim = len(inds)
342 if dim > 0:
343 p = np.repeat(params[inds], 2).reshape(dim, 2)
344 Z = np.multiply(p, z[inds])
345 bounds = ((p**2 - Z) / (np.sqrt(2) * p)).T
346 x_exp = np.prod(
347 -0.5
348 * np.exp(np.square(params[inds]) / 2)
349 * (special.erf(bounds[1]) - special.erf(bounds[0]))
350 )
351 else:
352 x_exp = 1
354 if len(excl) > 0:
355 x_others = np.prod(np.exp(np.multiply(params[excl], z[excl, 1])))
356 else:
357 x_others = 1
359 return x_exp * x_others * (N / int_prob) ** dim
361 def _fill_decomp_atoms(
362 self, decomp, atoms, interiors, inners, z_bins, N, tail_N, int_prob
363 ):
364 """Fill the atoms tensor for non-diagonal Sigma using the chosen decomposition."""
365 if decomp == "cholesky":
366 L = np.linalg.cholesky(self.Sigma)
367 for i in range(self.M):
368 params = L[i, 0 : i + 1]
369 num_z_dims = i + 1
370 xi_atoms = self._row_atoms(
371 self.mu[i], params, z_bins, num_z_dims, N, int_prob
372 )
373 self._fill_interiors_row(interiors, inners, N, tail_N, i)
374 atoms[i] = np.repeat(xi_atoms, (N + 2 * tail_N) ** (self.M - (i + 1)))
375 else:
376 Lambda, Q = np.linalg.eig(self.Sigma)
377 A = Q @ np.diag(np.sqrt(Lambda))
378 if decomp == "sqrt":
379 A = A @ Q.T
380 for i in range(self.M):
381 params = A[i]
382 xi_atoms = self._row_atoms(
383 self.mu[i], params, z_bins, self.M, N, int_prob
384 )
385 self._fill_interiors_row(interiors, inners, N, tail_N, i)
386 atoms[i] = xi_atoms
388 @classmethod
389 def _row_atoms(cls, mu_i, params, z_bins, num_z_dims, N, int_prob):
390 """Compute the per-row xi atoms by integrating the density over each z bin."""
391 Z_bins = [np.array(x) for x in product(*([z_bins] * num_z_dims))]
392 return [
393 np.exp(mu_i) * cls._eval_atom_density(params, z_bin, N, int_prob)
394 for z_bin in Z_bins
395 ]
397 def _approx_equiprobable(
398 self,
399 N: int,
400 endpoints: bool = False,
401 tail_bound: Union[float, list, tuple] = None,
402 decomp: str = "cholesky",
403 ):
404 """
405 Makes a discrete approximation using the equiprobable method to this multi-
406 variate lognormal distribution.
408 Parameters
409 ----------
410 N : int
411 The number of points in the discrete approximation.
412 tail_bound : Union[float, list, tuple], optional
413 The values of the CDF according to which the distribution is truncated.
414 If only a single number is specified, it is the lower tail bound and a
415 symmetric upper bound is chosen. Can make one-tailed approximations
416 with 0.0 or 1.0 as the lower and upper bound respectively. By default
417 the distribution is not truncated.
418 endpoints : bool
419 If endpoints is True, then atoms at the corner points of the truncated
420 region are included. By default, endpoints is False, which is when only
421 the interior points are included.
422 decomp : str in ["cholesky", "sqrt", "eig"], optional
423 The method of decomposing the covariance matrix. Available options are
424 the Cholesky decomposition, the positive-definite square root, and the
425 eigendecomposition. By default the Cholesky decomposition is used.
426 NOTE: The method of decomposition might affect the expectations of the
427 discretized distribution along each dimension dfferently.
429 Returns
430 -------
431 d : DiscreteDistribution
432 Probability associated with each point in array of discrete
433 points for discrete probability mass function.
434 """
436 tail_N = 1 if endpoints else 0
438 if decomp not in ["cholesky", "sqrt", "eig"]:
439 raise NotImplementedError(
440 "Decomposition method must be 'cholesky', 'sqrt' or 'eig'"
441 )
443 interiors = np.empty([self.M, (N + 2 * tail_N) ** (self.M)])
444 inners = self._make_inners(N, tail_N)
446 if np.array_equal(self.Sigma, np.diag(np.diag(self.Sigma))):
447 atoms = self._diagonal_atoms(N, tail_N, tail_bound, endpoints)
448 for i in range(self.M):
449 self._fill_interiors_row(interiors, inners, N, tail_N, i)
450 else:
451 z_bins, int_prob, tail_bound = self._compute_z_bins(N, tail_N, tail_bound)
452 atoms = np.empty([self.M, (N + (2 * tail_N)) ** self.M])
453 self._fill_decomp_atoms(
454 decomp, atoms, interiors, inners, z_bins, N, tail_N, int_prob
455 )
457 max_locs = np.argmax(np.abs(interiors), axis=0)
459 max_inds = np.stack([max_locs, np.arange(len(max_locs))], axis=1)
461 prob_locs = interiors[max_inds[:, 0], max_inds[:, 1]]
463 def prob_assign(x):
464 if x == 0:
465 return 1 / (N**self.M)
466 else:
467 return 0.0
469 prob_vec = np.vectorize(prob_assign)
471 pmv = prob_vec(prob_locs)
473 limit = {
474 "dist": self,
475 "method": "equiprobable",
476 "N": N,
477 "endpoints": endpoints,
478 "tail_bound": tail_bound,
479 "decomp": decomp,
480 }
482 return DiscreteDistribution(
483 pmv=pmv,
484 atoms=atoms,
485 seed=self.random_seed(),
486 limit=limit,
487 )