Coverage for HARK / distributions / multivariate.py: 92%

179 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-07 05:16 +0000

1from itertools import product 

2from typing import List, Union 

3 

4import numpy as np 

5from numpy import linalg 

6from scipy import special 

7from scipy.stats._multivariate import multi_rv_frozen, multivariate_normal_frozen 

8 

9from HARK.distributions.base import Distribution 

10from HARK.distributions.continuous import Lognormal, Normal 

11from HARK.distributions.discrete import DiscreteDistribution 

12 

13# MULTIVARIATE DISTRIBUTIONS 

14 

15 

16class MultivariateNormal(multivariate_normal_frozen, Distribution): 

17 """ 

18 A Multivariate Normal distribution. 

19 

20 Parameters 

21 ---------- 

22 mu : numpy array 

23 Mean vector. 

24 Sigma : 2-d numpy array. Each dimension must have length equal to that of 

25 mu. 

26 Variance-covariance matrix. 

27 seed : int 

28 Seed for random number generator. 

29 """ 

30 

31 def __init__(self, mu=[1, 1], Sigma=[[1, 0], [0, 1]], seed=0): 

32 self.mu = np.asarray(mu) 

33 self.Sigma = np.asarray(Sigma) 

34 self.M = self.mu.size 

35 multivariate_normal_frozen.__init__(self, mean=self.mu, cov=self.Sigma) 

36 Distribution.__init__(self, seed=seed) 

37 self.infimum = -np.inf * np.ones(self.M) 

38 self.supremum = np.inf * np.ones(self.M) 

39 

40 def discretize(self, N, method="hermite", endpoints=False): 

41 """ 

42 For multivariate normal distributions, the Gauss-Hermite 

43 quadrature rule is used as the default method for discretization. 

44 """ 

45 

46 return self._approx(N, method=method, endpoints=endpoints) 

47 

48 def _approx(self, N, method="hermite", endpoints=False): 

49 """ 

50 Returns a discrete approximation of this distribution. 

51 

52 The discretization will have N**M points, where M is the dimension of 

53 the multivariate normal. 

54 

55 It uses the fact that: 

56 - Being positive definite, Sigma can be factorized as Sigma = QVQ', 

57 with V diagonal. So, letting A=Q*sqrt(V), Sigma = A*A'. 

58 - If Z is an N-dimensional multivariate standard normal, then 

59 A*Z ~ N(0,A*A' = Sigma). 

60 

61 The idea therefore is to construct an equiprobable grid for a standard 

62 normal and multiply it by matrix A. 

63 """ 

64 

65 # Start by computing matrix A. 

66 v, Q = np.linalg.eig(self.Sigma) 

67 sqrtV = np.diag(np.sqrt(v)) 

68 A = np.matmul(Q, sqrtV) 

69 

70 # Now find a discretization for a univariate standard normal. 

71 

72 z_approx = Normal().discretize(N, method=method) 

73 

74 # Now create the multivariate grid and pmv 

75 Z = np.array(list(product(*[z_approx.atoms.flatten()] * self.M))) 

76 pmv = np.prod(np.array(list(product(*[z_approx.pmv] * self.M))), axis=1) 

77 

78 # Apply mean and standard deviation to the Z grid 

79 atoms = self.mu[None, ...] + np.matmul(Z, A.T) 

80 

81 limit = { 

82 "dist": self, 

83 "method": method, 

84 "N": N, 

85 "endpoints": endpoints, 

86 } 

87 

88 # Construct and return discrete distribution 

89 return DiscreteDistribution( 

90 pmv, 

91 atoms.T, 

92 seed=self._rng.integers(0, 2**31 - 1, dtype="int32"), 

93 limit=limit, 

94 ) 

95 

96 

97class MultivariateLogNormal(multi_rv_frozen, Distribution): 

98 """ 

99 A Multivariate Lognormal distribution. 

100 

101 Parameters 

102 ---------- 

103 mu : Union[list, numpy.ndarray], optional 

104 Means of underlying multivariate normal, default [0.0, 0.0]. 

105 Sigma : Union[list, numpy.ndarray], optional 

106 nxn variance-covariance matrix of underlying multivariate normal, default [[1.0, 0.0], [0.0, 1.0]]. 

107 seed : int, optional 

108 Seed for random number generator, default 0. 

109 """ 

110 

111 def __init__( 

112 self, 

113 mu: Union[List, np.ndarray] = [0.0, 0.0], 

114 Sigma: Union[List, np.ndarray] = [[1.0, 0.0], [0.0, 1.0]], 

115 seed=None, 

116 ): 

117 self.mu = np.asarray(mu) 

118 self.Sigma = np.asarray(Sigma) 

119 self.M = self.mu.size 

120 

121 if self.Sigma.shape != (self.M, self.M): 

122 raise AttributeError(f"Sigma must be a {self.M}x{self.M} matrix") 

123 

124 if np.array_equal(self.Sigma, self.Sigma.T): 

125 res = np.all(np.linalg.eigvals(self.Sigma) >= 0) 

126 if not res: 

127 raise AttributeError("Sigma must be positive semi-definite") 

128 else: 

129 raise AttributeError("Sigma must be positive semi-definite") 

130 

131 multi_rv_frozen.__init__(self) 

132 Distribution.__init__(self, seed=seed) 

133 self.dstn = MultivariateNormal(mu=self.mu, Sigma=self.Sigma) 

134 

135 def mean(self): 

136 """ 

137 Mean of the distribution. 

138 

139 Returns 

140 ------- 

141 np.ndarray 

142 Mean of the distribution. 

143 """ 

144 

145 return np.exp(self.mu + 0.5 * np.diag(self.Sigma)) 

146 

147 def _cdf(self, x: Union[list, np.ndarray]): 

148 """ 

149 Cumulative distribution function of the distribution evaluated at x. 

150 

151 Parameters 

152 ---------- 

153 x : np.ndarray 

154 Point at which to evaluate the CDF. 

155 

156 Returns 

157 ------- 

158 float 

159 CDF evaluated at x. 

160 """ 

161 

162 x = np.asarray(x) 

163 if (x.shape != self.M) & (x.shape[1] != self.M): 

164 raise ValueError(f"x must be and {self.M}-dimensional input") 

165 return self.dstn.cdf(np.log(x)) 

166 

167 def _pdf(self, x: Union[list, np.ndarray]): 

168 """ 

169 Probability density function of the distribution evaluated at x. 

170 

171 Parameters 

172 ---------- 

173 x : Union[list, np.ndarray] 

174 Point at which to evaluate the PDF. 

175 

176 Returns 

177 ------- 

178 float 

179 PDF evaluated at x. 

180 """ 

181 

182 x = np.asarray(x) 

183 

184 if (x.shape != (self.M,)) & (x.shape[1] != self.M): 

185 raise ValueError(f"x must be an {self.M}-dimensional input") 

186 

187 eigenvalues = linalg.eigvals(self.Sigma) 

188 

189 pseudo_det = np.prod(eigenvalues[eigenvalues > 1e-12]) 

190 

191 inverse_sigma = linalg.pinv(self.Sigma) 

192 

193 rank_sigma = linalg.matrix_rank(self.Sigma) 

194 

195 pd = np.multiply( 

196 (1 / np.prod(x, axis=1, keepdims=True)), 

197 (2 * np.pi) ** (-rank_sigma / 2) 

198 * pseudo_det ** (-0.5) 

199 * np.exp(-(1 / 2) * np.multiply(np.log(x) @ inverse_sigma, np.log(x))), 

200 ) 

201 return pd 

202 

203 def _marginal(self, x: Union[np.ndarray, float, list], dim: int): 

204 """ 

205 Marginal distribution of one of the variables in the bivariate distribution evaluated at x. 

206 

207 Parameters 

208 ---------- 

209 x : Union[np.ndarray, float] 

210 Point at which to evaluate the marginal distribution. 

211 dim : int 

212 Which of the random variables to evaluate. 

213 

214 Returns 

215 ------- 

216 float 

217 Marginal distribution evaluated at x. 

218 """ 

219 

220 x = np.asarray(x) 

221 x_dim = Lognormal(mu=self.mu[dim], sigma=np.sqrt(self.Sigma[dim, dim])) 

222 return x_dim.pdf(x) 

223 

224 def _marginal_cdf(self, x: Union[np.ndarray, float, list], dim: int): 

225 """ 

226 Cumulative distribution function of one of the variables from the bivariate distribution evaluated at x. 

227 

228 Parameters 

229 ---------- 

230 x : Union[np.ndarray, float] 

231 Point at which to evaluate the marginal CDF. 

232 dim : int 

233 Which of the random variables to evaluate. 

234 

235 Returns 

236 ------- 

237 float 

238 Marginal CDF evaluated at x. 

239 """ 

240 

241 x = np.asarray(x) 

242 x_dim = Lognormal(mu=self.mu[dim], sigma=np.sqrt(self.Sigma[dim, dim])) 

243 return x_dim.cdf(x) 

244 

245 def rvs(self, size: int = 1, random_state=None): 

246 """ 

247 Random sample from the distribution. 

248 

249 Parameters 

250 ---------- 

251 size : int 

252 Number of data points to generate. 

253 random_state : optional 

254 Seed for random number generator. 

255 

256 Returns 

257 ------- 

258 np.ndarray 

259 Random sample from the distribution. 

260 """ 

261 return np.exp(self.dstn.rvs(size, random_state=random_state)) 

262 

263 def _approx_equiprobable( 

264 self, 

265 N: int, 

266 endpoints: bool = False, 

267 tail_bound: Union[float, list, tuple] = None, 

268 decomp: str = "cholesky", 

269 ): 

270 """ 

271 Makes a discrete approximation using the equiprobable method to this multi- 

272 variate lognormal distribution. 

273 

274 Parameters 

275 ---------- 

276 N : int 

277 The number of points in the discrete approximation. 

278 tail_bound : Union[float, list, tuple], optional 

279 The values of the CDF according to which the distribution is truncated. 

280 If only a single number is specified, it is the lower tail bound and a 

281 symmetric upper bound is chosen. Can make one-tailed approximations 

282 with 0.0 or 1.0 as the lower and upper bound respectively. By default 

283 the distribution is not truncated. 

284 endpoints : bool 

285 If endpoints is True, then atoms at the corner points of the truncated 

286 region are included. By default, endpoints is False, which is when only 

287 the interior points are included. 

288 decomp : str in ["cholesky", "sqrt", "eig"], optional 

289 The method of decomposing the covariance matrix. Available options are 

290 the Cholesky decomposition, the positive-definite square root, and the 

291 eigendecomposition. By default the Cholesky decomposition is used. 

292 NOTE: The method of decomposition might affect the expectations of the 

293 discretized distribution along each dimension dfferently. 

294 

295 Returns 

296 ------- 

297 d : DiscreteDistribution 

298 Probability associated with each point in array of discrete 

299 points for discrete probability mass function. 

300 """ 

301 

302 if endpoints: 

303 tail_N = 1 

304 else: 

305 tail_N = 0 

306 

307 if decomp not in ["cholesky", "sqrt", "eig"]: 

308 raise NotImplementedError( 

309 "Decomposition method must be 'cholesky', 'sqrt' or 'eig'" 

310 ) 

311 

312 if np.array_equal(self.Sigma, np.diag(np.diag(self.Sigma))): 

313 ind_atoms = np.empty((self.M, N + 2 * tail_N)) 

314 

315 for i in range(self.M): 

316 if self.Sigma[i, i] == 0.0: 

317 x_atoms = np.repeat(np.exp(self.mu[i]), N + 2 * tail_N) 

318 ind_atoms[i] = x_atoms 

319 else: 

320 x_atoms = ( 

321 Lognormal(mu=self.mu[i], sigma=np.sqrt(self.Sigma[i, i])) 

322 ._approx_equiprobable( 

323 N, tail_N=tail_N, tail_bound=tail_bound, endpoints=endpoints 

324 ) 

325 .atoms 

326 ) 

327 ind_atoms[i] = x_atoms 

328 

329 atoms_list = [ind_atoms[i] for i in range(self.M)] 

330 atoms = np.stack( 

331 [ar.flatten() for ar in list(np.meshgrid(*atoms_list))], axis=1 

332 ).T 

333 

334 interiors = np.empty([self.M, (N + 2 * tail_N) ** (self.M)]) 

335 

336 inners = np.zeros(N + 2 * tail_N) 

337 

338 if tail_N > 0: 

339 inners[:tail_N] = [(tail_N - i) for i in range(tail_N)] 

340 inners[-tail_N:] = [(i + 1) for i in range(tail_N)] 

341 

342 for i in range(self.M): 

343 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)] 

344 

345 interiors[i] = np.repeat( 

346 [*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1)) 

347 ) 

348 

349 else: 

350 if tail_bound is not None: 

351 if type(tail_bound) is float: 

352 tail_bound = [tail_bound, 1 - tail_bound] 

353 

354 if tail_bound[0] < 0 or tail_bound[1] > 1: 

355 raise ValueError("Tail bounds must be between 0 and 1") 

356 

357 cdf_cuts = np.linspace(tail_bound[0], tail_bound[1], N + 1) 

358 int_prob = tail_bound[1] - tail_bound[0] 

359 

360 else: 

361 cdf_cuts = np.linspace(0, 1, N + 1) 

362 int_prob = 1.0 

363 

364 Z = Normal() 

365 

366 z_cuts = np.empty(2 * tail_N + N + 1) 

367 if tail_N > 0: 

368 z_cuts[0:tail_N] = Z.ppf(cdf_cuts[0]) 

369 z_cuts[-tail_N:] = Z.ppf(cdf_cuts[-1]) 

370 

371 z_cuts[tail_N : tail_N + N + 1] = Z.ppf(cdf_cuts) 

372 z_bins = [(z_cuts[i], z_cuts[i + 1]) for i in range(N + 2 * tail_N)] 

373 

374 atoms = np.empty([self.M, (N + (2 * tail_N)) ** self.M]) 

375 

376 interiors = np.empty([self.M, (N + 2 * tail_N) ** (self.M)]) 

377 

378 inners = np.zeros(N + 2 * tail_N) 

379 

380 if tail_N > 0: 

381 inners[:tail_N] = [(tail_N - i) for i in range(tail_N)] 

382 inners[-tail_N:] = [(i + 1) for i in range(tail_N)] 

383 

384 def eval(params, z): 

385 inds = [] 

386 excl = [] 

387 

388 for j in range(len(z)): 

389 if z[j, 0] == z[j, 1]: 

390 excl.append(j) 

391 elif params[j] != 0.0: 

392 inds.append(j) 

393 

394 dim = len(inds) 

395 

396 p = np.repeat(params[inds], 2).reshape(dim, 2) 

397 

398 Z = np.multiply(p, z[inds]) 

399 

400 bounds = ((p**2 - Z) / (np.sqrt(2) * p)).T 

401 

402 if len(inds) > 0: 

403 x_exp = np.prod( 

404 -0.5 

405 * np.exp(np.square(params[inds]) / 2) 

406 * (special.erf(bounds[1]) - special.erf(bounds[0])) 

407 ) 

408 else: 

409 x_exp = 1 

410 

411 if len(excl) > 0: 

412 x_others = np.prod(np.exp(np.multiply(params[excl], z[excl, 1]))) 

413 else: 

414 x_others = 1 

415 

416 return x_exp * x_others * (N / int_prob) ** dim 

417 

418 if decomp == "cholesky": 

419 L = np.linalg.cholesky(self.Sigma) 

420 

421 for i in range(self.M): 

422 mui = self.mu[i] 

423 params = L[i, 0 : i + 1] 

424 

425 Z_list = [z_bins for _ in range(i + 1)] 

426 

427 Z_bins = [np.array(x) for x in list(product(*Z_list))] 

428 

429 xi_atoms = [] 

430 

431 for z_bin in Z_bins: 

432 atom = np.exp(mui) * eval(params, z_bin) 

433 xi_atoms.append(atom) 

434 

435 xi_atoms_arr = np.repeat( 

436 xi_atoms, (N + 2 * tail_N) ** (self.M - (i + 1)) 

437 ) 

438 

439 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)] 

440 

441 interiors[i] = np.repeat( 

442 [*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1)) 

443 ) 

444 

445 atoms[i] = xi_atoms_arr 

446 else: 

447 Λ, Q = np.linalg.eig(self.Sigma) 

448 

449 A = Q @ np.diag(np.sqrt(Λ)) 

450 

451 if decomp == "sqrt": 

452 A = A @ Q.T 

453 

454 for i in range(self.M): 

455 mui = self.mu[i] 

456 params = A[i] 

457 

458 Z_list = [z_bins for _ in range(self.M)] 

459 

460 Z_bins = [np.array(x) for x in list(product(*Z_list))] 

461 

462 xi_atoms = [] 

463 

464 for z_bin in Z_bins: 

465 atom = np.exp(mui) * eval(params, z_bin) 

466 xi_atoms.append(atom) 

467 

468 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)] 

469 

470 interiors[i] = np.repeat( 

471 [*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1)) 

472 ) 

473 

474 atoms[i] = xi_atoms 

475 

476 max_locs = np.argmax(np.abs(interiors), axis=0) 

477 

478 max_inds = np.stack([max_locs, np.arange(len(max_locs))], axis=1) 

479 

480 prob_locs = interiors[max_inds[:, 0], max_inds[:, 1]] 

481 

482 def prob_assign(x): 

483 if x == 0: 

484 return 1 / (N**self.M) 

485 else: 

486 return 0.0 

487 

488 prob_vec = np.vectorize(prob_assign) 

489 

490 pmv = prob_vec(prob_locs) 

491 

492 limit = { 

493 "dist": self, 

494 "method": "equiprobable", 

495 "N": N, 

496 "endpoints": endpoints, 

497 "tail_bound": tail_bound, 

498 "decomp": decomp, 

499 } 

500 

501 return DiscreteDistribution( 

502 pmv=pmv, 

503 atoms=atoms, 

504 seed=self._rng.integers(0, 2**31 - 1, dtype="int32"), 

505 limit=limit, 

506 )