Coverage for HARK / distributions / multivariate.py: 92%
179 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-07 05:16 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-07 05:16 +0000
1from itertools import product
2from typing import List, Union
4import numpy as np
5from numpy import linalg
6from scipy import special
7from scipy.stats._multivariate import multi_rv_frozen, multivariate_normal_frozen
9from HARK.distributions.base import Distribution
10from HARK.distributions.continuous import Lognormal, Normal
11from HARK.distributions.discrete import DiscreteDistribution
13# MULTIVARIATE DISTRIBUTIONS
16class MultivariateNormal(multivariate_normal_frozen, Distribution):
17 """
18 A Multivariate Normal distribution.
20 Parameters
21 ----------
22 mu : numpy array
23 Mean vector.
24 Sigma : 2-d numpy array. Each dimension must have length equal to that of
25 mu.
26 Variance-covariance matrix.
27 seed : int
28 Seed for random number generator.
29 """
31 def __init__(self, mu=[1, 1], Sigma=[[1, 0], [0, 1]], seed=0):
32 self.mu = np.asarray(mu)
33 self.Sigma = np.asarray(Sigma)
34 self.M = self.mu.size
35 multivariate_normal_frozen.__init__(self, mean=self.mu, cov=self.Sigma)
36 Distribution.__init__(self, seed=seed)
37 self.infimum = -np.inf * np.ones(self.M)
38 self.supremum = np.inf * np.ones(self.M)
40 def discretize(self, N, method="hermite", endpoints=False):
41 """
42 For multivariate normal distributions, the Gauss-Hermite
43 quadrature rule is used as the default method for discretization.
44 """
46 return self._approx(N, method=method, endpoints=endpoints)
48 def _approx(self, N, method="hermite", endpoints=False):
49 """
50 Returns a discrete approximation of this distribution.
52 The discretization will have N**M points, where M is the dimension of
53 the multivariate normal.
55 It uses the fact that:
56 - Being positive definite, Sigma can be factorized as Sigma = QVQ',
57 with V diagonal. So, letting A=Q*sqrt(V), Sigma = A*A'.
58 - If Z is an N-dimensional multivariate standard normal, then
59 A*Z ~ N(0,A*A' = Sigma).
61 The idea therefore is to construct an equiprobable grid for a standard
62 normal and multiply it by matrix A.
63 """
65 # Start by computing matrix A.
66 v, Q = np.linalg.eig(self.Sigma)
67 sqrtV = np.diag(np.sqrt(v))
68 A = np.matmul(Q, sqrtV)
70 # Now find a discretization for a univariate standard normal.
72 z_approx = Normal().discretize(N, method=method)
74 # Now create the multivariate grid and pmv
75 Z = np.array(list(product(*[z_approx.atoms.flatten()] * self.M)))
76 pmv = np.prod(np.array(list(product(*[z_approx.pmv] * self.M))), axis=1)
78 # Apply mean and standard deviation to the Z grid
79 atoms = self.mu[None, ...] + np.matmul(Z, A.T)
81 limit = {
82 "dist": self,
83 "method": method,
84 "N": N,
85 "endpoints": endpoints,
86 }
88 # Construct and return discrete distribution
89 return DiscreteDistribution(
90 pmv,
91 atoms.T,
92 seed=self._rng.integers(0, 2**31 - 1, dtype="int32"),
93 limit=limit,
94 )
97class MultivariateLogNormal(multi_rv_frozen, Distribution):
98 """
99 A Multivariate Lognormal distribution.
101 Parameters
102 ----------
103 mu : Union[list, numpy.ndarray], optional
104 Means of underlying multivariate normal, default [0.0, 0.0].
105 Sigma : Union[list, numpy.ndarray], optional
106 nxn variance-covariance matrix of underlying multivariate normal, default [[1.0, 0.0], [0.0, 1.0]].
107 seed : int, optional
108 Seed for random number generator, default 0.
109 """
111 def __init__(
112 self,
113 mu: Union[List, np.ndarray] = [0.0, 0.0],
114 Sigma: Union[List, np.ndarray] = [[1.0, 0.0], [0.0, 1.0]],
115 seed=None,
116 ):
117 self.mu = np.asarray(mu)
118 self.Sigma = np.asarray(Sigma)
119 self.M = self.mu.size
121 if self.Sigma.shape != (self.M, self.M):
122 raise AttributeError(f"Sigma must be a {self.M}x{self.M} matrix")
124 if np.array_equal(self.Sigma, self.Sigma.T):
125 res = np.all(np.linalg.eigvals(self.Sigma) >= 0)
126 if not res:
127 raise AttributeError("Sigma must be positive semi-definite")
128 else:
129 raise AttributeError("Sigma must be positive semi-definite")
131 multi_rv_frozen.__init__(self)
132 Distribution.__init__(self, seed=seed)
133 self.dstn = MultivariateNormal(mu=self.mu, Sigma=self.Sigma)
135 def mean(self):
136 """
137 Mean of the distribution.
139 Returns
140 -------
141 np.ndarray
142 Mean of the distribution.
143 """
145 return np.exp(self.mu + 0.5 * np.diag(self.Sigma))
147 def _cdf(self, x: Union[list, np.ndarray]):
148 """
149 Cumulative distribution function of the distribution evaluated at x.
151 Parameters
152 ----------
153 x : np.ndarray
154 Point at which to evaluate the CDF.
156 Returns
157 -------
158 float
159 CDF evaluated at x.
160 """
162 x = np.asarray(x)
163 if (x.shape != self.M) & (x.shape[1] != self.M):
164 raise ValueError(f"x must be and {self.M}-dimensional input")
165 return self.dstn.cdf(np.log(x))
167 def _pdf(self, x: Union[list, np.ndarray]):
168 """
169 Probability density function of the distribution evaluated at x.
171 Parameters
172 ----------
173 x : Union[list, np.ndarray]
174 Point at which to evaluate the PDF.
176 Returns
177 -------
178 float
179 PDF evaluated at x.
180 """
182 x = np.asarray(x)
184 if (x.shape != (self.M,)) & (x.shape[1] != self.M):
185 raise ValueError(f"x must be an {self.M}-dimensional input")
187 eigenvalues = linalg.eigvals(self.Sigma)
189 pseudo_det = np.prod(eigenvalues[eigenvalues > 1e-12])
191 inverse_sigma = linalg.pinv(self.Sigma)
193 rank_sigma = linalg.matrix_rank(self.Sigma)
195 pd = np.multiply(
196 (1 / np.prod(x, axis=1, keepdims=True)),
197 (2 * np.pi) ** (-rank_sigma / 2)
198 * pseudo_det ** (-0.5)
199 * np.exp(-(1 / 2) * np.multiply(np.log(x) @ inverse_sigma, np.log(x))),
200 )
201 return pd
203 def _marginal(self, x: Union[np.ndarray, float, list], dim: int):
204 """
205 Marginal distribution of one of the variables in the bivariate distribution evaluated at x.
207 Parameters
208 ----------
209 x : Union[np.ndarray, float]
210 Point at which to evaluate the marginal distribution.
211 dim : int
212 Which of the random variables to evaluate.
214 Returns
215 -------
216 float
217 Marginal distribution evaluated at x.
218 """
220 x = np.asarray(x)
221 x_dim = Lognormal(mu=self.mu[dim], sigma=np.sqrt(self.Sigma[dim, dim]))
222 return x_dim.pdf(x)
224 def _marginal_cdf(self, x: Union[np.ndarray, float, list], dim: int):
225 """
226 Cumulative distribution function of one of the variables from the bivariate distribution evaluated at x.
228 Parameters
229 ----------
230 x : Union[np.ndarray, float]
231 Point at which to evaluate the marginal CDF.
232 dim : int
233 Which of the random variables to evaluate.
235 Returns
236 -------
237 float
238 Marginal CDF evaluated at x.
239 """
241 x = np.asarray(x)
242 x_dim = Lognormal(mu=self.mu[dim], sigma=np.sqrt(self.Sigma[dim, dim]))
243 return x_dim.cdf(x)
245 def rvs(self, size: int = 1, random_state=None):
246 """
247 Random sample from the distribution.
249 Parameters
250 ----------
251 size : int
252 Number of data points to generate.
253 random_state : optional
254 Seed for random number generator.
256 Returns
257 -------
258 np.ndarray
259 Random sample from the distribution.
260 """
261 return np.exp(self.dstn.rvs(size, random_state=random_state))
263 def _approx_equiprobable(
264 self,
265 N: int,
266 endpoints: bool = False,
267 tail_bound: Union[float, list, tuple] = None,
268 decomp: str = "cholesky",
269 ):
270 """
271 Makes a discrete approximation using the equiprobable method to this multi-
272 variate lognormal distribution.
274 Parameters
275 ----------
276 N : int
277 The number of points in the discrete approximation.
278 tail_bound : Union[float, list, tuple], optional
279 The values of the CDF according to which the distribution is truncated.
280 If only a single number is specified, it is the lower tail bound and a
281 symmetric upper bound is chosen. Can make one-tailed approximations
282 with 0.0 or 1.0 as the lower and upper bound respectively. By default
283 the distribution is not truncated.
284 endpoints : bool
285 If endpoints is True, then atoms at the corner points of the truncated
286 region are included. By default, endpoints is False, which is when only
287 the interior points are included.
288 decomp : str in ["cholesky", "sqrt", "eig"], optional
289 The method of decomposing the covariance matrix. Available options are
290 the Cholesky decomposition, the positive-definite square root, and the
291 eigendecomposition. By default the Cholesky decomposition is used.
292 NOTE: The method of decomposition might affect the expectations of the
293 discretized distribution along each dimension dfferently.
295 Returns
296 -------
297 d : DiscreteDistribution
298 Probability associated with each point in array of discrete
299 points for discrete probability mass function.
300 """
302 if endpoints:
303 tail_N = 1
304 else:
305 tail_N = 0
307 if decomp not in ["cholesky", "sqrt", "eig"]:
308 raise NotImplementedError(
309 "Decomposition method must be 'cholesky', 'sqrt' or 'eig'"
310 )
312 if np.array_equal(self.Sigma, np.diag(np.diag(self.Sigma))):
313 ind_atoms = np.empty((self.M, N + 2 * tail_N))
315 for i in range(self.M):
316 if self.Sigma[i, i] == 0.0:
317 x_atoms = np.repeat(np.exp(self.mu[i]), N + 2 * tail_N)
318 ind_atoms[i] = x_atoms
319 else:
320 x_atoms = (
321 Lognormal(mu=self.mu[i], sigma=np.sqrt(self.Sigma[i, i]))
322 ._approx_equiprobable(
323 N, tail_N=tail_N, tail_bound=tail_bound, endpoints=endpoints
324 )
325 .atoms
326 )
327 ind_atoms[i] = x_atoms
329 atoms_list = [ind_atoms[i] for i in range(self.M)]
330 atoms = np.stack(
331 [ar.flatten() for ar in list(np.meshgrid(*atoms_list))], axis=1
332 ).T
334 interiors = np.empty([self.M, (N + 2 * tail_N) ** (self.M)])
336 inners = np.zeros(N + 2 * tail_N)
338 if tail_N > 0:
339 inners[:tail_N] = [(tail_N - i) for i in range(tail_N)]
340 inners[-tail_N:] = [(i + 1) for i in range(tail_N)]
342 for i in range(self.M):
343 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)]
345 interiors[i] = np.repeat(
346 [*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1))
347 )
349 else:
350 if tail_bound is not None:
351 if type(tail_bound) is float:
352 tail_bound = [tail_bound, 1 - tail_bound]
354 if tail_bound[0] < 0 or tail_bound[1] > 1:
355 raise ValueError("Tail bounds must be between 0 and 1")
357 cdf_cuts = np.linspace(tail_bound[0], tail_bound[1], N + 1)
358 int_prob = tail_bound[1] - tail_bound[0]
360 else:
361 cdf_cuts = np.linspace(0, 1, N + 1)
362 int_prob = 1.0
364 Z = Normal()
366 z_cuts = np.empty(2 * tail_N + N + 1)
367 if tail_N > 0:
368 z_cuts[0:tail_N] = Z.ppf(cdf_cuts[0])
369 z_cuts[-tail_N:] = Z.ppf(cdf_cuts[-1])
371 z_cuts[tail_N : tail_N + N + 1] = Z.ppf(cdf_cuts)
372 z_bins = [(z_cuts[i], z_cuts[i + 1]) for i in range(N + 2 * tail_N)]
374 atoms = np.empty([self.M, (N + (2 * tail_N)) ** self.M])
376 interiors = np.empty([self.M, (N + 2 * tail_N) ** (self.M)])
378 inners = np.zeros(N + 2 * tail_N)
380 if tail_N > 0:
381 inners[:tail_N] = [(tail_N - i) for i in range(tail_N)]
382 inners[-tail_N:] = [(i + 1) for i in range(tail_N)]
384 def eval(params, z):
385 inds = []
386 excl = []
388 for j in range(len(z)):
389 if z[j, 0] == z[j, 1]:
390 excl.append(j)
391 elif params[j] != 0.0:
392 inds.append(j)
394 dim = len(inds)
396 p = np.repeat(params[inds], 2).reshape(dim, 2)
398 Z = np.multiply(p, z[inds])
400 bounds = ((p**2 - Z) / (np.sqrt(2) * p)).T
402 if len(inds) > 0:
403 x_exp = np.prod(
404 -0.5
405 * np.exp(np.square(params[inds]) / 2)
406 * (special.erf(bounds[1]) - special.erf(bounds[0]))
407 )
408 else:
409 x_exp = 1
411 if len(excl) > 0:
412 x_others = np.prod(np.exp(np.multiply(params[excl], z[excl, 1])))
413 else:
414 x_others = 1
416 return x_exp * x_others * (N / int_prob) ** dim
418 if decomp == "cholesky":
419 L = np.linalg.cholesky(self.Sigma)
421 for i in range(self.M):
422 mui = self.mu[i]
423 params = L[i, 0 : i + 1]
425 Z_list = [z_bins for _ in range(i + 1)]
427 Z_bins = [np.array(x) for x in list(product(*Z_list))]
429 xi_atoms = []
431 for z_bin in Z_bins:
432 atom = np.exp(mui) * eval(params, z_bin)
433 xi_atoms.append(atom)
435 xi_atoms_arr = np.repeat(
436 xi_atoms, (N + 2 * tail_N) ** (self.M - (i + 1))
437 )
439 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)]
441 interiors[i] = np.repeat(
442 [*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1))
443 )
445 atoms[i] = xi_atoms_arr
446 else:
447 Λ, Q = np.linalg.eig(self.Sigma)
449 A = Q @ np.diag(np.sqrt(Λ))
451 if decomp == "sqrt":
452 A = A @ Q.T
454 for i in range(self.M):
455 mui = self.mu[i]
456 params = A[i]
458 Z_list = [z_bins for _ in range(self.M)]
460 Z_bins = [np.array(x) for x in list(product(*Z_list))]
462 xi_atoms = []
464 for z_bin in Z_bins:
465 atom = np.exp(mui) * eval(params, z_bin)
466 xi_atoms.append(atom)
468 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)]
470 interiors[i] = np.repeat(
471 [*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1))
472 )
474 atoms[i] = xi_atoms
476 max_locs = np.argmax(np.abs(interiors), axis=0)
478 max_inds = np.stack([max_locs, np.arange(len(max_locs))], axis=1)
480 prob_locs = interiors[max_inds[:, 0], max_inds[:, 1]]
482 def prob_assign(x):
483 if x == 0:
484 return 1 / (N**self.M)
485 else:
486 return 0.0
488 prob_vec = np.vectorize(prob_assign)
490 pmv = prob_vec(prob_locs)
492 limit = {
493 "dist": self,
494 "method": "equiprobable",
495 "N": N,
496 "endpoints": endpoints,
497 "tail_bound": tail_bound,
498 "decomp": decomp,
499 }
501 return DiscreteDistribution(
502 pmv=pmv,
503 atoms=atoms,
504 seed=self._rng.integers(0, 2**31 - 1, dtype="int32"),
505 limit=limit,
506 )