Coverage for HARK/distributions/multivariate.py: 28%

179 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-02 05:14 +0000

1from itertools import product 

2from typing import List, Union 

3 

4import numpy as np 

5from numpy import linalg 

6from scipy import special 

7from scipy.stats._multivariate import multi_rv_frozen, multivariate_normal_frozen 

8 

9from HARK.distributions.base import Distribution 

10from HARK.distributions.continuous import Lognormal, Normal 

11from HARK.distributions.discrete import DiscreteDistribution 

12 

13# MULTIVARIATE DISTRIBUTIONS 

14 

15 

16class MultivariateNormal(multivariate_normal_frozen, Distribution): 

17 """ 

18 A Multivariate Normal distribution. 

19 

20 Parameters 

21 ---------- 

22 mu : numpy array 

23 Mean vector. 

24 Sigma : 2-d numpy array. Each dimension must have length equal to that of 

25 mu. 

26 Variance-covariance matrix. 

27 seed : int 

28 Seed for random number generator. 

29 """ 

30 

31 def __init__(self, mu=[1, 1], Sigma=[[1, 0], [0, 1]], seed=0): 

32 self.mu = np.asarray(mu) 

33 self.Sigma = np.asarray(Sigma) 

34 self.M = self.mu.size 

35 multivariate_normal_frozen.__init__(self, mean=self.mu, cov=self.Sigma) 

36 Distribution.__init__(self, seed=seed) 

37 self.infimum = -np.inf * np.ones(self.M) 

38 self.supremum = np.inf * np.ones(self.M) 

39 

40 def discretize(self, N, method="hermite", endpoints=False): 

41 """ 

42 For multivariate normal distributions, the Gauss-Hermite 

43 quadrature rule is used as the default method for discretization. 

44 """ 

45 

46 return self._approx(N, method=method, endpoints=endpoints) 

47 

48 def _approx(self, N, method="hermite", endpoints=False): 

49 """ 

50 Returns a discrete approximation of this distribution. 

51 

52 The discretization will have N**M points, where M is the dimension of 

53 the multivariate normal. 

54 

55 It uses the fact that: 

56 - Being positive definite, Sigma can be factorized as Sigma = QVQ', 

57 with V diagonal. So, letting A=Q*sqrt(V), Sigma = A*A'. 

58 - If Z is an N-dimensional multivariate standard normal, then 

59 A*Z ~ N(0,A*A' = Sigma). 

60 

61 The idea therefore is to construct an equiprobable grid for a standard 

62 normal and multiply it by matrix A. 

63 """ 

64 

65 # Start by computing matrix A. 

66 v, Q = np.linalg.eig(self.Sigma) 

67 sqrtV = np.diag(np.sqrt(v)) 

68 A = np.matmul(Q, sqrtV) 

69 

70 # Now find a discretization for a univariate standard normal. 

71 

72 z_approx = Normal().discretize(N, method=method) 

73 

74 # Now create the multivariate grid and pmv 

75 Z = np.array(list(product(*[z_approx.atoms.flatten()] * self.M))) 

76 pmv = np.prod(np.array(list(product(*[z_approx.pmv] * self.M))), axis=1) 

77 

78 # Apply mean and standard deviation to the Z grid 

79 atoms = self.mu[None, ...] + np.matmul(Z, A.T) 

80 

81 limit = { 

82 "dist": self, 

83 "method": method, 

84 "N": N, 

85 "endpoints": endpoints, 

86 } 

87 

88 # Construct and return discrete distribution 

89 return DiscreteDistribution( 

90 pmv, 

91 atoms.T, 

92 seed=self._rng.integers(0, 2**31 - 1, dtype="int32"), 

93 limit=limit, 

94 ) 

95 

96 

97class MultivariateLogNormal(multi_rv_frozen, Distribution): 

98 """ 

99 A Multivariate Lognormal distribution. 

100 

101 Parameters 

102 ---------- 

103 mu : Union[list, numpy.ndarray], optional 

104 Means of underlying multivariate normal, default [0.0, 0.0]. 

105 Sigma : Union[list, numpy.ndarray], optional 

106 nxn variance-covariance matrix of underlying multivariate normal, default [[1.0, 0.0], [0.0, 1.0]]. 

107 seed : int, optional 

108 Seed for random number generator, default 0. 

109 """ 

110 

111 def __init__( 

112 self, 

113 mu: Union[List, np.ndarray] = [0.0, 0.0], 

114 Sigma: Union[List, np.ndarray] = [[1.0, 0.0], [0.0, 1.0]], 

115 seed=None, 

116 ): 

117 self.mu = np.asarray(mu) 

118 self.Sigma = np.asarray(Sigma) 

119 self.M = self.mu.size 

120 

121 if self.Sigma.shape != (self.M, self.M): 

122 raise AttributeError(f"Sigma must be a {self.M}x{self.M} matrix") 

123 

124 if np.array_equal(self.Sigma, self.Sigma.T): 

125 res = np.all(np.linalg.eigvals(self.Sigma) >= 0) 

126 if not res: 

127 raise AttributeError("Sigma must be positive semi-definite") 

128 else: 

129 raise AttributeError("Sigma must be positive semi-definite") 

130 

131 multi_rv_frozen.__init__(self) 

132 Distribution.__init__(self, seed=seed) 

133 

134 def mean(self): 

135 """ 

136 Mean of the distribution. 

137 

138 Returns 

139 ------- 

140 np.ndarray 

141 Mean of the distribution. 

142 """ 

143 

144 return np.exp(self.mu + 0.5 * np.diag(self.Sigma)) 

145 

146 def _cdf(self, x: Union[list, np.ndarray]): 

147 """ 

148 Cumulative distribution function of the distribution evaluated at x. 

149 

150 Parameters 

151 ---------- 

152 x : np.ndarray 

153 Point at which to evaluate the CDF. 

154 

155 Returns 

156 ------- 

157 float 

158 CDF evaluated at x. 

159 """ 

160 

161 x = np.asarray(x) 

162 

163 if (x.shape != self.M) & (x.shape[1] != self.M): 

164 raise ValueError(f"x must be and {self.M}-dimensional input") 

165 

166 return MultivariateNormal(mu=self.mu, Sigma=self.Sigma).cdf(np.log(x)) 

167 

168 def _pdf(self, x: Union[list, np.ndarray]): 

169 """ 

170 Probability density function of the distribution evaluated at x. 

171 

172 Parameters 

173 ---------- 

174 x : Union[list, np.ndarray] 

175 Point at which to evaluate the PDF. 

176 

177 Returns 

178 ------- 

179 float 

180 PDF evaluated at x. 

181 """ 

182 

183 x = np.asarray(x) 

184 

185 if (x.shape != (self.M,)) & (x.shape[1] != self.M): 

186 raise ValueError(f"x must be an {self.M}-dimensional input") 

187 

188 eigenvalues = linalg.eigvals(self.Sigma) 

189 

190 pseudo_det = np.prod(eigenvalues[eigenvalues > 1e-12]) 

191 

192 inverse_sigma = linalg.pinv(self.Sigma) 

193 

194 rank_sigma = linalg.matrix_rank(self.Sigma) 

195 

196 pd = np.multiply( 

197 (1 / np.prod(x, axis=1)), 

198 (2 * np.pi) ** (-rank_sigma / 2) 

199 * pseudo_det ** (-0.5) 

200 * np.exp(-(1 / 2) * np.multiply(np.log(x) @ inverse_sigma, np.log(x))), 

201 ) 

202 

203 return pd 

204 

205 def _marginal(self, x: Union[np.ndarray, float, list], dim: int): 

206 """ 

207 Marginal distribution of one of the variables in the bivariate distribution evaluated at x. 

208 

209 Parameters 

210 ---------- 

211 x : Union[np.ndarray, float] 

212 Point at which to evaluate the marginal distribution. 

213 dim : int 

214 Which of the random variables to evaluate (1 or 2). 

215 

216 Returns 

217 ------- 

218 float 

219 Marginal distribution evaluated at x. 

220 """ 

221 

222 x = np.asarray(x) 

223 

224 x_dim = Lognormal( 

225 mu=self.mu[dim - 1], sigma=np.sqrt(self.Sigma[dim - 1, dim - 1]) 

226 ) 

227 

228 return x_dim.pdf(x) 

229 

230 def _marginal_cdf(self, x: Union[np.ndarray, float, list], dim: int): 

231 """ 

232 Cumulative distribution function of one of the variables from the bivariate distribution evaluated at x. 

233 

234 Parameters 

235 ---------- 

236 x : Union[np.ndarray, float] 

237 Point at which to evaluate the marginal CDF. 

238 dim : int 

239 Which of the random variables to evaluate (1 or 2). 

240 

241 Returns 

242 ------- 

243 float 

244 Marginal CDF evaluated at x. 

245 """ 

246 

247 x = np.asarray(x) 

248 

249 x_dim = Lognormal( 

250 mu=self.mu[dim - 1], sigma=np.sqrt(self.Sigma[dim - 1, dim - 1]) 

251 ) 

252 

253 return x_dim.cdf(x) 

254 

255 def rvs(self, size: int = 1, random_state=None): 

256 """ 

257 Random sample from the distribution. 

258 

259 Parameters 

260 ---------- 

261 size : int 

262 Number of data points to generate. 

263 random_state : optional 

264 Seed for random number generator. 

265 

266 Returns 

267 ------- 

268 np.ndarray 

269 Random sample from the distribution. 

270 """ 

271 

272 Z = MultivariateNormal(mu=self.mu, Sigma=self.Sigma) 

273 

274 return np.exp(Z.rvs(size, random_state=random_state)) 

275 

276 def _approx_equiprobable( 

277 self, 

278 N: int, 

279 tail_bound: Union[float, list, tuple] = None, 

280 endpoints: bool = False, 

281 decomp: str = "cholesky", 

282 ): 

283 """ 

284 Makes a discrete approximation using the equiprobable method to this multivariate lognormal distribution. 

285 

286 Parameters 

287 ---------- 

288 N : int 

289 The number of points in the discrete approximation. 

290 tail_bound : Union[float, list, tuple], optional 

291 The values of the CDF according to which the distribution is truncated. If only a single number is specified, it is the lower tail bound and a symmetric upper bound is chosen. Can make one-tailed approximations with 0.0 or 1.0 as the lower and upper bound respectively. By default the distribution is not truncated. 

292 endpoints : bool 

293 If endpoints is True, then atoms at the corner points of the truncated region are included. By default, endpoints is False, which is when only the interior points are included. 

294 decomp : str in ["cholesky", "sqrt", "eig"], optional 

295 The method of decomposing the covariance matrix. Available options are the Cholesky decomposition, the positive-definite square root, and the eigendecomposition. By default the Cholesky decomposition is used. NOTE: The method of decomposition might affect the expectations of the discretized distribution along each dimension dfferently. 

296 

297 Returns 

298 ------- 

299 d : DiscreteDistribution 

300 Probability associated with each point in array of discrete 

301 points for discrete probability mass function. 

302 """ 

303 

304 if endpoints: 

305 tail_N = 1 

306 else: 

307 tail_N = 0 

308 

309 if decomp not in ["cholesky", "sqrt", "eig"]: 

310 raise NotImplementedError( 

311 "Decomposition method must be 'cholesky', 'sqrt' or 'eig'" 

312 ) 

313 

314 if np.array_equal(self.Sigma, np.diag(np.diag(self.Sigma))): 

315 ind_atoms = np.empty((self.M, N + 2 * tail_N)) 

316 

317 for i in range(self.M): 

318 if self.Sigma[i, i] == 0.0: 

319 x_atoms = np.repeat(np.exp(self.mu[i]), N + 2 * tail_N) 

320 ind_atoms[i] = x_atoms 

321 else: 

322 x_atoms = ( 

323 Lognormal(mu=self.mu[i], sigma=np.sqrt(self.Sigma[i, i])) 

324 ._approx_equiprobable( 

325 N, tail_N=tail_N, tail_bound=tail_bound, endpoints=endpoints 

326 ) 

327 .atoms 

328 ) 

329 ind_atoms[i] = x_atoms 

330 

331 atoms_list = [ind_atoms[i] for i in range(self.M)] 

332 atoms = np.stack( 

333 [ar.flatten() for ar in list(np.meshgrid(*atoms_list))], axis=1 

334 ).T 

335 

336 interiors = np.empty([self.M, (N + 2 * tail_N) ** (self.M)]) 

337 

338 inners = np.zeros(N + 2 * tail_N) 

339 

340 if tail_N > 0: 

341 inners[:tail_N] = [(tail_N - i) for i in range(tail_N)] 

342 inners[-tail_N:] = [(i + 1) for i in range(tail_N)] 

343 

344 for i in range(self.M): 

345 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)] 

346 

347 interiors[i] = np.repeat( 

348 [*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1)) 

349 ) 

350 

351 else: 

352 if tail_bound is not None: 

353 if type(tail_bound) is float: 

354 tail_bound = [tail_bound, 1 - tail_bound] 

355 

356 if tail_bound[0] < 0 or tail_bound[1] > 1: 

357 raise ValueError("Tail bounds must be between 0 and 1") 

358 

359 cdf_cuts = np.linspace(tail_bound[0], tail_bound[1], N + 1) 

360 int_prob = tail_bound[1] - tail_bound[0] 

361 

362 else: 

363 cdf_cuts = np.linspace(0, 1, N + 1) 

364 int_prob = 1.0 

365 

366 Z = Normal() 

367 

368 z_cuts = np.empty(2 * tail_N + N + 1) 

369 if tail_N > 0: 

370 z_cuts[0:tail_N] = Z.ppf(cdf_cuts[0]) 

371 z_cuts[-tail_N:] = Z.ppf(cdf_cuts[-1]) 

372 

373 z_cuts[tail_N : tail_N + N + 1] = Z.ppf(cdf_cuts) 

374 z_bins = [(z_cuts[i], z_cuts[i + 1]) for i in range(N + 2 * tail_N)] 

375 

376 atoms = np.empty([self.M, (N + (2 * tail_N)) ** self.M]) 

377 

378 interiors = np.empty([self.M, (N + 2 * tail_N) ** (self.M)]) 

379 

380 inners = np.zeros(N + 2 * tail_N) 

381 

382 if tail_N > 0: 

383 inners[:tail_N] = [(tail_N - i) for i in range(tail_N)] 

384 inners[-tail_N:] = [(i + 1) for i in range(tail_N)] 

385 

386 def eval(params, z): 

387 inds = [] 

388 excl = [] 

389 

390 for j in range(len(z)): 

391 if z[j, 0] == z[j, 1]: 

392 excl.append(j) 

393 elif params[j] != 0.0: 

394 inds.append(j) 

395 

396 dim = len(inds) 

397 

398 p = np.repeat(params[inds], 2).reshape(dim, 2) 

399 

400 Z = np.multiply(p, z[inds]) 

401 

402 bounds = ((p**2 - Z) / (np.sqrt(2) * p)).T 

403 

404 if len(inds) > 0: 

405 x_exp = np.prod( 

406 -0.5 

407 * np.exp(np.square(params[inds]) / 2) 

408 * (special.erf(bounds[1]) - special.erf(bounds[0])) 

409 ) 

410 else: 

411 x_exp = 1 

412 

413 if len(excl) > 0: 

414 x_others = np.prod(np.exp(np.multiply(params[excl], z[excl, 1]))) 

415 else: 

416 x_others = 1 

417 

418 return x_exp * x_others * (N / int_prob) ** dim 

419 

420 if decomp == "cholesky": 

421 L = np.linalg.cholesky(self.Sigma) 

422 

423 for i in range(self.M): 

424 mui = self.mu[i] 

425 params = L[i, 0 : i + 1] 

426 

427 Z_list = [z_bins for _ in range(i + 1)] 

428 

429 Z_bins = [np.array(x) for x in list(product(*Z_list))] 

430 

431 xi_atoms = [] 

432 

433 for z_bin in Z_bins: 

434 atom = np.exp(mui) * eval(params, z_bin) 

435 xi_atoms.append(atom) 

436 

437 xi_atoms_arr = np.repeat( 

438 xi_atoms, (N + 2 * tail_N) ** (self.M - (i + 1)) 

439 ) 

440 

441 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)] 

442 

443 interiors[i] = np.repeat( 

444 [*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1)) 

445 ) 

446 

447 atoms[i] = xi_atoms_arr 

448 else: 

449 Λ, Q = np.linalg.eig(self.Sigma) 

450 

451 A = Q @ np.diag(np.sqrt(Λ)) 

452 

453 if decomp == "sqrt": 

454 A = A @ Q.T 

455 

456 for i in range(self.M): 

457 mui = self.mu[i] 

458 params = A[i] 

459 

460 Z_list = [z_bins for _ in range(self.M)] 

461 

462 Z_bins = [np.array(x) for x in list(product(*Z_list))] 

463 

464 xi_atoms = [] 

465 

466 for z_bin in Z_bins: 

467 atom = np.exp(mui) * eval(params, z_bin) 

468 xi_atoms.append(atom) 

469 

470 inners_i = [inners for _ in range((N + 2 * tail_N) ** i)] 

471 

472 interiors[i] = np.repeat( 

473 [*inners_i], (N + 2 * tail_N) ** (self.M - (i + 1)) 

474 ) 

475 

476 atoms[i] = xi_atoms 

477 

478 max_locs = np.argmax(np.abs(interiors), axis=0) 

479 

480 max_inds = np.stack([max_locs, np.arange(len(max_locs))], axis=1) 

481 

482 prob_locs = interiors[max_inds[:, 0], max_inds[:, 1]] 

483 

484 def prob_assign(x): 

485 if x == 0: 

486 return 1 / (N**self.M) 

487 else: 

488 return 0.0 

489 

490 prob_vec = np.vectorize(prob_assign) 

491 

492 pmv = prob_vec(prob_locs) 

493 

494 limit = { 

495 "dist": self, 

496 "method": "equiprobable", 

497 "N": N, 

498 "endpoints": endpoints, 

499 "tail_bound": tail_bound, 

500 "decomp": decomp, 

501 } 

502 

503 return DiscreteDistribution( 

504 pmv=pmv, 

505 atoms=atoms, 

506 seed=self._rng.integers(0, 2**31 - 1, dtype="int32"), 

507 limit=limit, 

508 )