Coverage for HARK / distributions / multivariate.py: 93%

175 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-10 06:19 +0000

1from itertools import product 

2from typing import List, Union 

3 

4import numpy as np 

5from numpy import linalg 

6from scipy import special 

7from scipy.stats._multivariate import multi_rv_frozen, multivariate_normal_frozen 

8 

9from HARK.distributions.base import Distribution 

10from HARK.distributions.continuous import Lognormal, Normal 

11from HARK.distributions.discrete import DiscreteDistribution 

12 

13# MULTIVARIATE DISTRIBUTIONS 

14 

15 

16class MultivariateNormal(multivariate_normal_frozen, Distribution): 

17 """ 

18 A Multivariate Normal distribution. 

19 

20 Parameters 

21 ---------- 

22 mu : numpy array 

23 Mean vector. 

24 Sigma : 2-d numpy array. Each dimension must have length equal to that of 

25 mu. 

26 Variance-covariance matrix. 

27 seed : int 

28 Seed for random number generator. 

29 """ 

30 

31 def __init__(self, mu=[1, 1], Sigma=[[1, 0], [0, 1]], seed=None): 

32 self.mu = np.asarray(mu) 

33 self.Sigma = np.asarray(Sigma) 

34 self.M = self.mu.size 

35 multivariate_normal_frozen.__init__(self, mean=self.mu, cov=self.Sigma) 

36 Distribution.__init__(self, seed=seed) 

37 self.infimum = -np.inf * np.ones(self.M) 

38 self.supremum = np.inf * np.ones(self.M) 

39 

40 def discretize(self, N, method="hermite", endpoints=False): 

41 """ 

42 For multivariate normal distributions, the Gauss-Hermite 

43 quadrature rule is used as the default method for discretization. 

44 """ 

45 

46 return self._approx(N, method=method, endpoints=endpoints) 

47 

48 def _approx(self, N, method="hermite", endpoints=False): 

49 """ 

50 Returns a discrete approximation of this distribution. 

51 

52 The discretization will have N**M points, where M is the dimension of 

53 the multivariate normal. 

54 

55 It uses the fact that: 

56 - Being positive definite, Sigma can be factorized as Sigma = QVQ', 

57 with V diagonal. So, letting A=Q*sqrt(V), Sigma = A*A'. 

58 - If Z is an N-dimensional multivariate standard normal, then 

59 A*Z ~ N(0,A*A' = Sigma). 

60 

61 The idea therefore is to construct an equiprobable grid for a standard 

62 normal and multiply it by matrix A. 

63 """ 

64 

65 # Start by computing matrix A. 

66 v, Q = np.linalg.eig(self.Sigma) 

67 sqrtV = np.diag(np.sqrt(v)) 

68 A = np.matmul(Q, sqrtV) 

69 

70 # Now find a discretization for a univariate standard normal. 

71 

72 z_approx = Normal().discretize(N, method=method) 

73 

74 # Now create the multivariate grid and pmv 

75 Z = np.array(list(product(*[z_approx.atoms.flatten()] * self.M))) 

76 pmv = np.prod(np.array(list(product(*[z_approx.pmv] * self.M))), axis=1) 

77 

78 # Apply mean and standard deviation to the Z grid 

79 atoms = self.mu[None, ...] + np.matmul(Z, A.T) 

80 

81 limit = { 

82 "dist": self, 

83 "method": method, 

84 "N": N, 

85 "endpoints": endpoints, 

86 } 

87 

88 # Construct and return discrete distribution 

89 return DiscreteDistribution( 

90 pmv, 

91 atoms.T, 

92 seed=self.random_seed(), 

93 limit=limit, 

94 ) 

95 

96 

97class MultivariateLogNormal(multi_rv_frozen, Distribution): 

98 """ 

99 A Multivariate Lognormal distribution. 

100 

101 Parameters 

102 ---------- 

103 mu : Union[list, numpy.ndarray], optional 

104 Means of underlying multivariate normal, default [0.0, 0.0]. 

105 Sigma : Union[list, numpy.ndarray], optional 

106 nxn variance-covariance matrix of underlying multivariate normal, default [[1.0, 0.0], [0.0, 1.0]]. 

107 seed : int, optional 

108 Seed for random number generator, default 0. 

109 """ 

110 

111 def __init__( 

112 self, 

113 mu: Union[List, np.ndarray] = [0.0, 0.0], 

114 Sigma: Union[List, np.ndarray] = [[1.0, 0.0], [0.0, 1.0]], 

115 seed=None, 

116 ): 

117 self.mu = np.asarray(mu) 

118 self.Sigma = np.asarray(Sigma) 

119 self.M = self.mu.size 

120 

121 if self.Sigma.shape != (self.M, self.M): 

122 raise AttributeError(f"Sigma must be a {self.M}x{self.M} matrix") 

123 

124 if np.array_equal(self.Sigma, self.Sigma.T): 

125 res = np.all(np.linalg.eigvals(self.Sigma) >= 0) 

126 if not res: 

127 raise AttributeError("Sigma must be positive semi-definite") 

128 else: 

129 raise AttributeError("Sigma must be positive semi-definite") 

130 

131 multi_rv_frozen.__init__(self) 

132 Distribution.__init__(self, seed=seed) 

133 self.dstn = MultivariateNormal(mu=self.mu, Sigma=self.Sigma) 

134 

135 def mean(self): 

136 """ 

137 Mean of the distribution. 

138 

139 Returns 

140 ------- 

141 np.ndarray 

142 Mean of the distribution. 

143 """ 

144 

145 return np.exp(self.mu + 0.5 * np.diag(self.Sigma)) 

146 

147 def _cdf(self, x: Union[list, np.ndarray]): 

148 """ 

149 Cumulative distribution function of the distribution evaluated at x. 

150 

151 Parameters 

152 ---------- 

153 x : np.ndarray 

154 Point at which to evaluate the CDF. 

155 

156 Returns 

157 ------- 

158 float 

159 CDF evaluated at x. 

160 """ 

161 

162 x = np.asarray(x) 

163 if (x.shape != self.M) & (x.shape[1] != self.M): 

164 raise ValueError(f"x must be and {self.M}-dimensional input") 

165 return self.dstn.cdf(np.log(x)) 

166 

167 def _pdf(self, x: Union[list, np.ndarray]): 

168 """ 

169 Probability density function of the distribution evaluated at x. 

170 

171 Parameters 

172 ---------- 

173 x : Union[list, np.ndarray] 

174 Point at which to evaluate the PDF. 

175 

176 Returns 

177 ------- 

178 float 

179 PDF evaluated at x. 

180 """ 

181 

182 x = np.asarray(x) 

183 

184 if (x.shape != (self.M,)) & (x.shape[1] != self.M): 

185 raise ValueError(f"x must be an {self.M}-dimensional input") 

186 

187 eigenvalues = linalg.eigvals(self.Sigma) 

188 

189 pseudo_det = np.prod(eigenvalues[eigenvalues > 1e-12]) 

190 

191 inverse_sigma = linalg.pinv(self.Sigma) 

192 

193 rank_sigma = linalg.matrix_rank(self.Sigma) 

194 

195 pd = np.multiply( 

196 (1 / np.prod(x, axis=1, keepdims=True)), 

197 (2 * np.pi) ** (-rank_sigma / 2) 

198 * pseudo_det ** (-0.5) 

199 * np.exp(-(1 / 2) * np.multiply(np.log(x) @ inverse_sigma, np.log(x))), 

200 ) 

201 return pd 

202 

203 def _marginal(self, x: Union[np.ndarray, float, list], dim: int): 

204 """ 

205 Marginal distribution of one of the variables in the bivariate distribution evaluated at x. 

206 

207 Parameters 

208 ---------- 

209 x : Union[np.ndarray, float] 

210 Point at which to evaluate the marginal distribution. 

211 dim : int 

212 Which of the random variables to evaluate. 

213 

214 Returns 

215 ------- 

216 float 

217 Marginal distribution evaluated at x. 

218 """ 

219 

220 x = np.asarray(x) 

221 x_dim = Lognormal(mu=self.mu[dim], sigma=np.sqrt(self.Sigma[dim, dim])) 

222 return x_dim.pdf(x) 

223 

224 def _marginal_cdf(self, x: Union[np.ndarray, float, list], dim: int): 

225 """ 

226 Cumulative distribution function of one of the variables from the bivariate distribution evaluated at x. 

227 

228 Parameters 

229 ---------- 

230 x : Union[np.ndarray, float] 

231 Point at which to evaluate the marginal CDF. 

232 dim : int 

233 Which of the random variables to evaluate. 

234 

235 Returns 

236 ------- 

237 float 

238 Marginal CDF evaluated at x. 

239 """ 

240 

241 x = np.asarray(x) 

242 x_dim = Lognormal(mu=self.mu[dim], sigma=np.sqrt(self.Sigma[dim, dim])) 

243 return x_dim.cdf(x) 

244 

245 def rvs(self, size: int = 1, random_state=None): 

246 """ 

247 Random sample from the distribution. 

248 

249 Parameters 

250 ---------- 

251 size : int 

252 Number of data points to generate. 

253 random_state : optional 

254 Seed for random number generator. 

255 

256 Returns 

257 ------- 

258 np.ndarray 

259 Random sample from the distribution. 

260 """ 

261 return np.exp(self.dstn.rvs(size, random_state=random_state)) 

262 

263 @staticmethod 

264 def _make_inners(N: int, tail_N: int) -> np.ndarray: 

265 """Build the per-dimension ``inners`` index vector used for tail tagging.""" 

266 inners = np.zeros(N + 2 * tail_N) 

267 if tail_N > 0: 

268 inners[:tail_N] = [(tail_N - i) for i in range(tail_N)] 

269 inners[-tail_N:] = [(i + 1) for i in range(tail_N)] 

270 return inners 

271 

272 def _fill_interiors_row( 

273 self, 

274 interiors: np.ndarray, 

275 inners: np.ndarray, 

276 N: int, 

277 tail_N: int, 

278 i: int, 

279 ) -> None: 

280 """Populate ``interiors[i]`` by tiling ``inners`` for dimension ``i``.""" 

281 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)] 

282 interiors[i] = np.repeat([*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1))) 

283 

284 def _diagonal_atoms(self, N, tail_N, tail_bound, endpoints): 

285 """Build the atoms tensor for the case where Sigma is diagonal.""" 

286 ind_atoms = np.empty((self.M, N + 2 * tail_N)) 

287 for i in range(self.M): 

288 if self.Sigma[i, i] == 0.0: 

289 ind_atoms[i] = np.repeat(np.exp(self.mu[i]), N + 2 * tail_N) 

290 else: 

291 ind_atoms[i] = ( 

292 Lognormal(mu=self.mu[i], sigma=np.sqrt(self.Sigma[i, i])) 

293 ._approx_equiprobable( 

294 N, tail_N=tail_N, tail_bound=tail_bound, endpoints=endpoints 

295 ) 

296 .atoms 

297 ) 

298 atoms_list = [ind_atoms[i] for i in range(self.M)] 

299 return np.stack( 

300 [ar.flatten() for ar in list(np.meshgrid(*atoms_list))], axis=1 

301 ).T 

302 

303 @staticmethod 

304 def _compute_z_bins(N, tail_N, tail_bound): 

305 """Return ``(z_bins, int_prob, tail_bound)`` shared by Cholesky/eig decomposition branches. 

306 

307 ``tail_bound`` is returned in normalized list form so that the caller 

308 can store the consistent value in ``limit`` metadata. 

309 """ 

310 if tail_bound is not None: 

311 if isinstance(tail_bound, (float, np.floating)): 

312 tail_bound = [tail_bound, 1 - tail_bound] 

313 if tail_bound[0] < 0 or tail_bound[1] > 1: 

314 raise ValueError("Tail bounds must be between 0 and 1") 

315 cdf_cuts = np.linspace(tail_bound[0], tail_bound[1], N + 1) 

316 int_prob = tail_bound[1] - tail_bound[0] 

317 else: 

318 cdf_cuts = np.linspace(0, 1, N + 1) 

319 int_prob = 1.0 

320 

321 Z = Normal() 

322 z_cuts = np.empty(2 * tail_N + N + 1) 

323 if tail_N > 0: 

324 z_cuts[0:tail_N] = Z.ppf(cdf_cuts[0]) 

325 z_cuts[-tail_N:] = Z.ppf(cdf_cuts[-1]) 

326 z_cuts[tail_N : tail_N + N + 1] = Z.ppf(cdf_cuts) 

327 z_bins = [(z_cuts[i], z_cuts[i + 1]) for i in range(N + 2 * tail_N)] 

328 return z_bins, int_prob, tail_bound 

329 

330 @staticmethod 

331 def _eval_atom_density(params, z, N, int_prob): 

332 """Compute the integral of the lognormal density over a multidimensional z bin.""" 

333 inds = [] 

334 excl = [] 

335 for j in range(len(z)): 

336 if z[j, 0] == z[j, 1]: 

337 excl.append(j) 

338 elif params[j] != 0.0: 

339 inds.append(j) 

340 

341 dim = len(inds) 

342 if dim > 0: 

343 p = np.repeat(params[inds], 2).reshape(dim, 2) 

344 Z = np.multiply(p, z[inds]) 

345 bounds = ((p**2 - Z) / (np.sqrt(2) * p)).T 

346 x_exp = np.prod( 

347 -0.5 

348 * np.exp(np.square(params[inds]) / 2) 

349 * (special.erf(bounds[1]) - special.erf(bounds[0])) 

350 ) 

351 else: 

352 x_exp = 1 

353 

354 if len(excl) > 0: 

355 x_others = np.prod(np.exp(np.multiply(params[excl], z[excl, 1]))) 

356 else: 

357 x_others = 1 

358 

359 return x_exp * x_others * (N / int_prob) ** dim 

360 

361 def _fill_decomp_atoms( 

362 self, decomp, atoms, interiors, inners, z_bins, N, tail_N, int_prob 

363 ): 

364 """Fill the atoms tensor for non-diagonal Sigma using the chosen decomposition.""" 

365 if decomp == "cholesky": 

366 L = np.linalg.cholesky(self.Sigma) 

367 for i in range(self.M): 

368 params = L[i, 0 : i + 1] 

369 num_z_dims = i + 1 

370 xi_atoms = self._row_atoms( 

371 self.mu[i], params, z_bins, num_z_dims, N, int_prob 

372 ) 

373 self._fill_interiors_row(interiors, inners, N, tail_N, i) 

374 atoms[i] = np.repeat(xi_atoms, (N + 2 * tail_N) ** (self.M - (i + 1))) 

375 else: 

376 Lambda, Q = np.linalg.eig(self.Sigma) 

377 A = Q @ np.diag(np.sqrt(Lambda)) 

378 if decomp == "sqrt": 

379 A = A @ Q.T 

380 for i in range(self.M): 

381 params = A[i] 

382 xi_atoms = self._row_atoms( 

383 self.mu[i], params, z_bins, self.M, N, int_prob 

384 ) 

385 self._fill_interiors_row(interiors, inners, N, tail_N, i) 

386 atoms[i] = xi_atoms 

387 

388 @classmethod 

389 def _row_atoms(cls, mu_i, params, z_bins, num_z_dims, N, int_prob): 

390 """Compute the per-row xi atoms by integrating the density over each z bin.""" 

391 Z_bins = [np.array(x) for x in product(*([z_bins] * num_z_dims))] 

392 return [ 

393 np.exp(mu_i) * cls._eval_atom_density(params, z_bin, N, int_prob) 

394 for z_bin in Z_bins 

395 ] 

396 

397 def _approx_equiprobable( 

398 self, 

399 N: int, 

400 endpoints: bool = False, 

401 tail_bound: Union[float, list, tuple] = None, 

402 decomp: str = "cholesky", 

403 ): 

404 """ 

405 Makes a discrete approximation using the equiprobable method to this multi- 

406 variate lognormal distribution. 

407 

408 Parameters 

409 ---------- 

410 N : int 

411 The number of points in the discrete approximation. 

412 tail_bound : Union[float, list, tuple], optional 

413 The values of the CDF according to which the distribution is truncated. 

414 If only a single number is specified, it is the lower tail bound and a 

415 symmetric upper bound is chosen. Can make one-tailed approximations 

416 with 0.0 or 1.0 as the lower and upper bound respectively. By default 

417 the distribution is not truncated. 

418 endpoints : bool 

419 If endpoints is True, then atoms at the corner points of the truncated 

420 region are included. By default, endpoints is False, which is when only 

421 the interior points are included. 

422 decomp : str in ["cholesky", "sqrt", "eig"], optional 

423 The method of decomposing the covariance matrix. Available options are 

424 the Cholesky decomposition, the positive-definite square root, and the 

425 eigendecomposition. By default the Cholesky decomposition is used. 

426 NOTE: The method of decomposition might affect the expectations of the 

427 discretized distribution along each dimension dfferently. 

428 

429 Returns 

430 ------- 

431 d : DiscreteDistribution 

432 Probability associated with each point in array of discrete 

433 points for discrete probability mass function. 

434 """ 

435 

436 tail_N = 1 if endpoints else 0 

437 

438 if decomp not in ["cholesky", "sqrt", "eig"]: 

439 raise NotImplementedError( 

440 "Decomposition method must be 'cholesky', 'sqrt' or 'eig'" 

441 ) 

442 

443 interiors = np.empty([self.M, (N + 2 * tail_N) ** (self.M)]) 

444 inners = self._make_inners(N, tail_N) 

445 

446 if np.array_equal(self.Sigma, np.diag(np.diag(self.Sigma))): 

447 atoms = self._diagonal_atoms(N, tail_N, tail_bound, endpoints) 

448 for i in range(self.M): 

449 self._fill_interiors_row(interiors, inners, N, tail_N, i) 

450 else: 

451 z_bins, int_prob, tail_bound = self._compute_z_bins(N, tail_N, tail_bound) 

452 atoms = np.empty([self.M, (N + (2 * tail_N)) ** self.M]) 

453 self._fill_decomp_atoms( 

454 decomp, atoms, interiors, inners, z_bins, N, tail_N, int_prob 

455 ) 

456 

457 max_locs = np.argmax(np.abs(interiors), axis=0) 

458 

459 max_inds = np.stack([max_locs, np.arange(len(max_locs))], axis=1) 

460 

461 prob_locs = interiors[max_inds[:, 0], max_inds[:, 1]] 

462 

463 def prob_assign(x): 

464 if x == 0: 

465 return 1 / (N**self.M) 

466 else: 

467 return 0.0 

468 

469 prob_vec = np.vectorize(prob_assign) 

470 

471 pmv = prob_vec(prob_locs) 

472 

473 limit = { 

474 "dist": self, 

475 "method": "equiprobable", 

476 "N": N, 

477 "endpoints": endpoints, 

478 "tail_bound": tail_bound, 

479 "decomp": decomp, 

480 } 

481 

482 return DiscreteDistribution( 

483 pmv=pmv, 

484 atoms=atoms, 

485 seed=self.random_seed(), 

486 limit=limit, 

487 )