Coverage for HARK/distributions/multivariate.py: 28%
179 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 05:14 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 05:14 +0000
1from itertools import product
2from typing import List, Union
4import numpy as np
5from numpy import linalg
6from scipy import special
7from scipy.stats._multivariate import multi_rv_frozen, multivariate_normal_frozen
9from HARK.distributions.base import Distribution
10from HARK.distributions.continuous import Lognormal, Normal
11from HARK.distributions.discrete import DiscreteDistribution
13# MULTIVARIATE DISTRIBUTIONS
16class MultivariateNormal(multivariate_normal_frozen, Distribution):
17 """
18 A Multivariate Normal distribution.
20 Parameters
21 ----------
22 mu : numpy array
23 Mean vector.
24 Sigma : 2-d numpy array. Each dimension must have length equal to that of
25 mu.
26 Variance-covariance matrix.
27 seed : int
28 Seed for random number generator.
29 """
31 def __init__(self, mu=[1, 1], Sigma=[[1, 0], [0, 1]], seed=0):
32 self.mu = np.asarray(mu)
33 self.Sigma = np.asarray(Sigma)
34 self.M = self.mu.size
35 multivariate_normal_frozen.__init__(self, mean=self.mu, cov=self.Sigma)
36 Distribution.__init__(self, seed=seed)
37 self.infimum = -np.inf * np.ones(self.M)
38 self.supremum = np.inf * np.ones(self.M)
40 def discretize(self, N, method="hermite", endpoints=False):
41 """
42 For multivariate normal distributions, the Gauss-Hermite
43 quadrature rule is used as the default method for discretization.
44 """
46 return self._approx(N, method=method, endpoints=endpoints)
48 def _approx(self, N, method="hermite", endpoints=False):
49 """
50 Returns a discrete approximation of this distribution.
52 The discretization will have N**M points, where M is the dimension of
53 the multivariate normal.
55 It uses the fact that:
56 - Being positive definite, Sigma can be factorized as Sigma = QVQ',
57 with V diagonal. So, letting A=Q*sqrt(V), Sigma = A*A'.
58 - If Z is an N-dimensional multivariate standard normal, then
59 A*Z ~ N(0,A*A' = Sigma).
61 The idea therefore is to construct an equiprobable grid for a standard
62 normal and multiply it by matrix A.
63 """
65 # Start by computing matrix A.
66 v, Q = np.linalg.eig(self.Sigma)
67 sqrtV = np.diag(np.sqrt(v))
68 A = np.matmul(Q, sqrtV)
70 # Now find a discretization for a univariate standard normal.
72 z_approx = Normal().discretize(N, method=method)
74 # Now create the multivariate grid and pmv
75 Z = np.array(list(product(*[z_approx.atoms.flatten()] * self.M)))
76 pmv = np.prod(np.array(list(product(*[z_approx.pmv] * self.M))), axis=1)
78 # Apply mean and standard deviation to the Z grid
79 atoms = self.mu[None, ...] + np.matmul(Z, A.T)
81 limit = {
82 "dist": self,
83 "method": method,
84 "N": N,
85 "endpoints": endpoints,
86 }
88 # Construct and return discrete distribution
89 return DiscreteDistribution(
90 pmv,
91 atoms.T,
92 seed=self._rng.integers(0, 2**31 - 1, dtype="int32"),
93 limit=limit,
94 )
97class MultivariateLogNormal(multi_rv_frozen, Distribution):
98 """
99 A Multivariate Lognormal distribution.
101 Parameters
102 ----------
103 mu : Union[list, numpy.ndarray], optional
104 Means of underlying multivariate normal, default [0.0, 0.0].
105 Sigma : Union[list, numpy.ndarray], optional
106 nxn variance-covariance matrix of underlying multivariate normal, default [[1.0, 0.0], [0.0, 1.0]].
107 seed : int, optional
108 Seed for random number generator, default 0.
109 """
111 def __init__(
112 self,
113 mu: Union[List, np.ndarray] = [0.0, 0.0],
114 Sigma: Union[List, np.ndarray] = [[1.0, 0.0], [0.0, 1.0]],
115 seed=None,
116 ):
117 self.mu = np.asarray(mu)
118 self.Sigma = np.asarray(Sigma)
119 self.M = self.mu.size
121 if self.Sigma.shape != (self.M, self.M):
122 raise AttributeError(f"Sigma must be a {self.M}x{self.M} matrix")
124 if np.array_equal(self.Sigma, self.Sigma.T):
125 res = np.all(np.linalg.eigvals(self.Sigma) >= 0)
126 if not res:
127 raise AttributeError("Sigma must be positive semi-definite")
128 else:
129 raise AttributeError("Sigma must be positive semi-definite")
131 multi_rv_frozen.__init__(self)
132 Distribution.__init__(self, seed=seed)
134 def mean(self):
135 """
136 Mean of the distribution.
138 Returns
139 -------
140 np.ndarray
141 Mean of the distribution.
142 """
144 return np.exp(self.mu + 0.5 * np.diag(self.Sigma))
146 def _cdf(self, x: Union[list, np.ndarray]):
147 """
148 Cumulative distribution function of the distribution evaluated at x.
150 Parameters
151 ----------
152 x : np.ndarray
153 Point at which to evaluate the CDF.
155 Returns
156 -------
157 float
158 CDF evaluated at x.
159 """
161 x = np.asarray(x)
163 if (x.shape != self.M) & (x.shape[1] != self.M):
164 raise ValueError(f"x must be and {self.M}-dimensional input")
166 return MultivariateNormal(mu=self.mu, Sigma=self.Sigma).cdf(np.log(x))
168 def _pdf(self, x: Union[list, np.ndarray]):
169 """
170 Probability density function of the distribution evaluated at x.
172 Parameters
173 ----------
174 x : Union[list, np.ndarray]
175 Point at which to evaluate the PDF.
177 Returns
178 -------
179 float
180 PDF evaluated at x.
181 """
183 x = np.asarray(x)
185 if (x.shape != (self.M,)) & (x.shape[1] != self.M):
186 raise ValueError(f"x must be an {self.M}-dimensional input")
188 eigenvalues = linalg.eigvals(self.Sigma)
190 pseudo_det = np.prod(eigenvalues[eigenvalues > 1e-12])
192 inverse_sigma = linalg.pinv(self.Sigma)
194 rank_sigma = linalg.matrix_rank(self.Sigma)
196 pd = np.multiply(
197 (1 / np.prod(x, axis=1)),
198 (2 * np.pi) ** (-rank_sigma / 2)
199 * pseudo_det ** (-0.5)
200 * np.exp(-(1 / 2) * np.multiply(np.log(x) @ inverse_sigma, np.log(x))),
201 )
203 return pd
205 def _marginal(self, x: Union[np.ndarray, float, list], dim: int):
206 """
207 Marginal distribution of one of the variables in the bivariate distribution evaluated at x.
209 Parameters
210 ----------
211 x : Union[np.ndarray, float]
212 Point at which to evaluate the marginal distribution.
213 dim : int
214 Which of the random variables to evaluate (1 or 2).
216 Returns
217 -------
218 float
219 Marginal distribution evaluated at x.
220 """
222 x = np.asarray(x)
224 x_dim = Lognormal(
225 mu=self.mu[dim - 1], sigma=np.sqrt(self.Sigma[dim - 1, dim - 1])
226 )
228 return x_dim.pdf(x)
230 def _marginal_cdf(self, x: Union[np.ndarray, float, list], dim: int):
231 """
232 Cumulative distribution function of one of the variables from the bivariate distribution evaluated at x.
234 Parameters
235 ----------
236 x : Union[np.ndarray, float]
237 Point at which to evaluate the marginal CDF.
238 dim : int
239 Which of the random variables to evaluate (1 or 2).
241 Returns
242 -------
243 float
244 Marginal CDF evaluated at x.
245 """
247 x = np.asarray(x)
249 x_dim = Lognormal(
250 mu=self.mu[dim - 1], sigma=np.sqrt(self.Sigma[dim - 1, dim - 1])
251 )
253 return x_dim.cdf(x)
255 def rvs(self, size: int = 1, random_state=None):
256 """
257 Random sample from the distribution.
259 Parameters
260 ----------
261 size : int
262 Number of data points to generate.
263 random_state : optional
264 Seed for random number generator.
266 Returns
267 -------
268 np.ndarray
269 Random sample from the distribution.
270 """
272 Z = MultivariateNormal(mu=self.mu, Sigma=self.Sigma)
274 return np.exp(Z.rvs(size, random_state=random_state))
276 def _approx_equiprobable(
277 self,
278 N: int,
279 tail_bound: Union[float, list, tuple] = None,
280 endpoints: bool = False,
281 decomp: str = "cholesky",
282 ):
283 """
284 Makes a discrete approximation using the equiprobable method to this multivariate lognormal distribution.
286 Parameters
287 ----------
288 N : int
289 The number of points in the discrete approximation.
290 tail_bound : Union[float, list, tuple], optional
291 The values of the CDF according to which the distribution is truncated. If only a single number is specified, it is the lower tail bound and a symmetric upper bound is chosen. Can make one-tailed approximations with 0.0 or 1.0 as the lower and upper bound respectively. By default the distribution is not truncated.
292 endpoints : bool
293 If endpoints is True, then atoms at the corner points of the truncated region are included. By default, endpoints is False, which is when only the interior points are included.
294 decomp : str in ["cholesky", "sqrt", "eig"], optional
295 The method of decomposing the covariance matrix. Available options are the Cholesky decomposition, the positive-definite square root, and the eigendecomposition. By default the Cholesky decomposition is used. NOTE: The method of decomposition might affect the expectations of the discretized distribution along each dimension dfferently.
297 Returns
298 -------
299 d : DiscreteDistribution
300 Probability associated with each point in array of discrete
301 points for discrete probability mass function.
302 """
304 if endpoints:
305 tail_N = 1
306 else:
307 tail_N = 0
309 if decomp not in ["cholesky", "sqrt", "eig"]:
310 raise NotImplementedError(
311 "Decomposition method must be 'cholesky', 'sqrt' or 'eig'"
312 )
314 if np.array_equal(self.Sigma, np.diag(np.diag(self.Sigma))):
315 ind_atoms = np.empty((self.M, N + 2 * tail_N))
317 for i in range(self.M):
318 if self.Sigma[i, i] == 0.0:
319 x_atoms = np.repeat(np.exp(self.mu[i]), N + 2 * tail_N)
320 ind_atoms[i] = x_atoms
321 else:
322 x_atoms = (
323 Lognormal(mu=self.mu[i], sigma=np.sqrt(self.Sigma[i, i]))
324 ._approx_equiprobable(
325 N, tail_N=tail_N, tail_bound=tail_bound, endpoints=endpoints
326 )
327 .atoms
328 )
329 ind_atoms[i] = x_atoms
331 atoms_list = [ind_atoms[i] for i in range(self.M)]
332 atoms = np.stack(
333 [ar.flatten() for ar in list(np.meshgrid(*atoms_list))], axis=1
334 ).T
336 interiors = np.empty([self.M, (N + 2 * tail_N) ** (self.M)])
338 inners = np.zeros(N + 2 * tail_N)
340 if tail_N > 0:
341 inners[:tail_N] = [(tail_N - i) for i in range(tail_N)]
342 inners[-tail_N:] = [(i + 1) for i in range(tail_N)]
344 for i in range(self.M):
345 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)]
347 interiors[i] = np.repeat(
348 [*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1))
349 )
351 else:
352 if tail_bound is not None:
353 if type(tail_bound) is float:
354 tail_bound = [tail_bound, 1 - tail_bound]
356 if tail_bound[0] < 0 or tail_bound[1] > 1:
357 raise ValueError("Tail bounds must be between 0 and 1")
359 cdf_cuts = np.linspace(tail_bound[0], tail_bound[1], N + 1)
360 int_prob = tail_bound[1] - tail_bound[0]
362 else:
363 cdf_cuts = np.linspace(0, 1, N + 1)
364 int_prob = 1.0
366 Z = Normal()
368 z_cuts = np.empty(2 * tail_N + N + 1)
369 if tail_N > 0:
370 z_cuts[0:tail_N] = Z.ppf(cdf_cuts[0])
371 z_cuts[-tail_N:] = Z.ppf(cdf_cuts[-1])
373 z_cuts[tail_N : tail_N + N + 1] = Z.ppf(cdf_cuts)
374 z_bins = [(z_cuts[i], z_cuts[i + 1]) for i in range(N + 2 * tail_N)]
376 atoms = np.empty([self.M, (N + (2 * tail_N)) ** self.M])
378 interiors = np.empty([self.M, (N + 2 * tail_N) ** (self.M)])
380 inners = np.zeros(N + 2 * tail_N)
382 if tail_N > 0:
383 inners[:tail_N] = [(tail_N - i) for i in range(tail_N)]
384 inners[-tail_N:] = [(i + 1) for i in range(tail_N)]
386 def eval(params, z):
387 inds = []
388 excl = []
390 for j in range(len(z)):
391 if z[j, 0] == z[j, 1]:
392 excl.append(j)
393 elif params[j] != 0.0:
394 inds.append(j)
396 dim = len(inds)
398 p = np.repeat(params[inds], 2).reshape(dim, 2)
400 Z = np.multiply(p, z[inds])
402 bounds = ((p**2 - Z) / (np.sqrt(2) * p)).T
404 if len(inds) > 0:
405 x_exp = np.prod(
406 -0.5
407 * np.exp(np.square(params[inds]) / 2)
408 * (special.erf(bounds[1]) - special.erf(bounds[0]))
409 )
410 else:
411 x_exp = 1
413 if len(excl) > 0:
414 x_others = np.prod(np.exp(np.multiply(params[excl], z[excl, 1])))
415 else:
416 x_others = 1
418 return x_exp * x_others * (N / int_prob) ** dim
420 if decomp == "cholesky":
421 L = np.linalg.cholesky(self.Sigma)
423 for i in range(self.M):
424 mui = self.mu[i]
425 params = L[i, 0 : i + 1]
427 Z_list = [z_bins for _ in range(i + 1)]
429 Z_bins = [np.array(x) for x in list(product(*Z_list))]
431 xi_atoms = []
433 for z_bin in Z_bins:
434 atom = np.exp(mui) * eval(params, z_bin)
435 xi_atoms.append(atom)
437 xi_atoms_arr = np.repeat(
438 xi_atoms, (N + 2 * tail_N) ** (self.M - (i + 1))
439 )
441 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)]
443 interiors[i] = np.repeat(
444 [*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1))
445 )
447 atoms[i] = xi_atoms_arr
448 else:
449 Λ, Q = np.linalg.eig(self.Sigma)
451 A = Q @ np.diag(np.sqrt(Λ))
453 if decomp == "sqrt":
454 A = A @ Q.T
456 for i in range(self.M):
457 mui = self.mu[i]
458 params = A[i]
460 Z_list = [z_bins for _ in range(self.M)]
462 Z_bins = [np.array(x) for x in list(product(*Z_list))]
464 xi_atoms = []
466 for z_bin in Z_bins:
467 atom = np.exp(mui) * eval(params, z_bin)
468 xi_atoms.append(atom)
470 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)]
472 interiors[i] = np.repeat(
473 [*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1))
474 )
476 atoms[i] = xi_atoms
478 max_locs = np.argmax(np.abs(interiors), axis=0)
480 max_inds = np.stack([max_locs, np.arange(len(max_locs))], axis=1)
482 prob_locs = interiors[max_inds[:, 0], max_inds[:, 1]]
484 def prob_assign(x):
485 if x == 0:
486 return 1 / (N**self.M)
487 else:
488 return 0.0
490 prob_vec = np.vectorize(prob_assign)
492 pmv = prob_vec(prob_locs)
494 limit = {
495 "dist": self,
496 "method": "equiprobable",
497 "N": N,
498 "endpoints": endpoints,
499 "tail_bound": tail_bound,
500 "decomp": decomp,
501 }
503 return DiscreteDistribution(
504 pmv=pmv,
505 atoms=atoms,
506 seed=self._rng.integers(0, 2**31 - 1, dtype="int32"),
507 limit=limit,
508 )