Coverage for HARK / distributions / continuous.py: 91%

149 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-10 06:19 +0000

1from typing import Any, Optional, Union 

2 

3import numpy as np 

4from scipy.special import erfc 

5from scipy.stats import ( 

6 rv_continuous, 

7 norm, 

8 lognorm, 

9 uniform, 

10 weibull_min, 

11) 

12from scipy.stats._distn_infrastructure import rv_continuous_frozen 

13 

14from HARK.distributions.base import Distribution 

15from HARK.distributions.discrete import DiscreteDistribution 

16 

17 

18def _hermite_discretize(distribution, N, endpoints, atoms_transform, lower_endpoint): 

19 """Gauss-Hermite quadrature discretization shared by Normal/Lognormal. 

20 

21 ``atoms_transform`` maps the linear ``sqrt(2)*sigma*x + mu`` grid to the 

22 distribution's support (identity for Normal, ``np.exp`` for Lognormal). 

23 ``lower_endpoint`` is the value prepended when ``endpoints=True``. 

24 """ 

25 x, w = np.polynomial.hermite.hermgauss(N) 

26 pmv = w * np.pi**-0.5 

27 atoms = atoms_transform(np.sqrt(2.0) * distribution.sigma * x + distribution.mu) 

28 if endpoints: 

29 atoms = np.r_[lower_endpoint, atoms, np.inf] 

30 pmv = np.r_[0.0, pmv, 0.0] 

31 return DiscreteDistribution( 

32 pmv, 

33 atoms, 

34 seed=distribution.random_seed(), 

35 limit={ 

36 "dist": distribution, 

37 "method": "hermite", 

38 "N": N, 

39 "endpoints": endpoints, 

40 }, 

41 ) 

42 

43 

44# CONTINUOUS DISTRIBUTIONS 

45 

46 

47class ContinuousFrozenDistribution(rv_continuous_frozen, Distribution): 

48 """ 

49 Parameterized continuous distribution from scipy.stats with seed management. 

50 """ 

51 

52 def __init__( 

53 self, dist: rv_continuous, *args: Any, seed: int = 0, **kwds: Any 

54 ) -> None: 

55 """ 

56 Parameterized continuous distribution from scipy.stats with seed management. 

57 

58 Parameters 

59 ---------- 

60 dist : rv_continuous 

61 Continuous distribution from scipy.stats. 

62 seed : int, optional 

63 Seed for random number generator, by default 0 

64 """ 

65 rv_continuous_frozen.__init__(self, dist, *args, **kwds) 

66 Distribution.__init__(self, seed=seed) 

67 

68 

69class Normal(ContinuousFrozenDistribution): 

70 """ 

71 A Normal distribution. 

72 

73 Parameters 

74 ---------- 

75 mu : float or [float] 

76 One or more means. Number of elements T in mu determines number 

77 of rows of output. 

78 sigma : float or [float] 

79 One or more standard deviations. Number of elements T in sigma 

80 determines number of rows of output. 

81 seed : int 

82 Seed for random number generator. 

83 """ 

84 

85 def __new__( 

86 cls, 

87 mu: Union[float, np.ndarray] = 0.0, 

88 sigma: Union[float, np.ndarray] = 1.0, 

89 seed: Optional[int] = None, 

90 ): 

91 """ 

92 Create a new Normal distribution. If sigma is zero, return a 

93 DiscreteDistribution with a single atom at mu. 

94 Parameters 

95 ---------- 

96 mu : Union[float, np.ndarray], optional 

97 Mean of normal distribution, by default 0.0 

98 sigma : Union[float, np.ndarray], optional 

99 Standard deviation of normal distribution, by default 1.0 

100 seed : Optional[int], optional 

101 Seed for random number generator, by default None 

102 Returns 

103 ------- 

104 Normal or DiscreteDistribution 

105 Normal distribution or DiscreteDistribution with a single atom. 

106 """ 

107 

108 if sigma == 0: 

109 # If sigma is zero, return a DiscreteDistribution with a single atom 

110 return DiscreteDistribution([1.0], mu, seed=seed) 

111 

112 return super().__new__(cls) 

113 

114 def __init__(self, mu=0.0, sigma=1.0, seed=None): 

115 self.mu = np.asarray(mu) 

116 self.sigma = np.asarray(sigma) 

117 

118 if self.mu.size != self.sigma.size: 

119 raise AttributeError( 

120 f"'mu' and 'sigma' must be of the same size. Instead 'mu' is of " 

121 f"size {self.mu.size} and 'sigma' is of size {self.sigma.size}." 

122 ) 

123 

124 super().__init__(norm, loc=mu, scale=sigma, seed=seed) 

125 self.infimum = -np.inf * np.ones(self.mu.size) 

126 self.supremum = np.inf * np.ones(self.mu.size) 

127 

128 def discretize(self, N, method="hermite", endpoints=False): 

129 """ 

130 For normal distributions, the Gauss-Hermite quadrature rule is 

131 used as the default method for discretization. 

132 """ 

133 

134 return super().discretize(N, method, endpoints) 

135 

136 def _approx_hermite(self, N, endpoints=False): 

137 """ 

138 Returns a discrete approximation of this distribution 

139 using the Gauss-Hermite quadrature rule. 

140 

141 Parameters 

142 ---------- 

143 N : int 

144 Number of discrete points to approximate the distribution. 

145 

146 Returns 

147 ------- 

148 DiscreteDistribution 

149 Discrete approximation of this distribution. 

150 """ 

151 return _hermite_discretize(self, N, endpoints, lambda v: v, -np.inf) 

152 

153 def _approx_equiprobable(self, N, endpoints=False): 

154 """ 

155 Returns a discrete equiprobable approximation of this distribution. 

156 

157 Parameters 

158 ---------- 

159 N : int 

160 Number of discrete points to approximate the distribution. 

161 

162 Returns 

163 ------- 

164 DiscreteDistribution 

165 Discrete approximation of this distribution. 

166 """ 

167 

168 CDF = np.linspace(0, 1, N + 1) 

169 lims = norm.ppf(CDF) 

170 pdf = norm.pdf(lims) 

171 

172 # Find conditional means using Mills's ratio 

173 pmv = np.diff(CDF) 

174 atoms = self.mu - np.diff(pdf) / pmv * self.sigma 

175 

176 if endpoints: 

177 atoms = np.r_[-np.inf, atoms, np.inf] 

178 pmv = np.r_[0.0, pmv, 0.0] 

179 

180 limit = {"dist": self, "method": "equiprobable", "N": N, "endpoints": endpoints} 

181 

182 return DiscreteDistribution( 

183 pmv, 

184 atoms, 

185 seed=self._rng.integers(0, 2**31 - 1, dtype="int32"), 

186 limit=limit, 

187 ) 

188 

189 

190class Lognormal(ContinuousFrozenDistribution): 

191 """ 

192 A Lognormal distribution 

193 

194 Parameters 

195 ---------- 

196 mu : float or [float] 

197 One or more means of underlying normal distribution. 

198 Number of elements T in mu determines number of rows of output. 

199 sigma : float or [float] 

200 One or more standard deviations of underlying normal distribution. 

201 Number of elements T in sigma determines number of rows of output. 

202 seed : int 

203 Seed for random number generator. 

204 """ 

205 

206 def __new__( 

207 cls, 

208 mu: Union[float, np.ndarray] = 0.0, 

209 sigma: Union[float, np.ndarray] = 1.0, 

210 seed: Optional[int] = None, 

211 mean=None, 

212 std=None, 

213 ): 

214 """ 

215 Create a new Lognormal distribution. If sigma is zero, return a 

216 DiscreteDistribution with a single atom at exp(mu). 

217 

218 Parameters 

219 ---------- 

220 mu : Union[float, np.ndarray], optional 

221 Mean of underlying normal distribution, by default 0.0 

222 sigma : Union[float, np.ndarray], optional 

223 Standard deviation of underlying normal distribution, by default 1.0 

224 seed : Optional[int], optional 

225 Seed for random number generator, by default None 

226 mean: optional 

227 For alternative mean/std parameterization, the mean of the lognormal distribution 

228 std: optional 

229 For alternative mean/std parameterization, the standard deviation of the lognormal distribution 

230 

231 Returns 

232 ------- 

233 Lognormal or DiscreteDistribution 

234 Lognormal distribution or DiscreteDistribution with a single atom. 

235 """ 

236 

237 if sigma == 0: 

238 # If sigma is zero, return a DiscreteDistribution with a single atom 

239 if mean is not None: 

240 return DiscreteDistribution([1.0], mean, seed=seed) 

241 return DiscreteDistribution([1.0], [np.exp(mu)], seed=seed) 

242 

243 return super().__new__(cls) 

244 

245 def __init__( 

246 self, 

247 mu: Union[float, np.ndarray] = 0.0, 

248 sigma: Union[float, np.ndarray] = 1.0, 

249 seed: Optional[int] = None, 

250 mean=None, 

251 std=None, 

252 ): 

253 if mean is not None and std is not None: 

254 mean_squared = mean**2 

255 variance = std**2 

256 mu = np.log(mean / (np.sqrt(1.0 + variance / mean_squared))) 

257 sigma = np.sqrt(np.log(1.0 + variance / mean_squared)) 

258 

259 if mean is not None and std is None: 

260 mu = np.log(mean) - sigma**2 / 2 

261 

262 self.mu = np.asarray(mu) 

263 self.sigma = np.asarray(sigma) 

264 

265 if self.mu.size != self.sigma.size: 

266 raise AttributeError( 

267 "mu and sigma must be of same size, are %s, %s" 

268 % ((self.mu.size), (self.sigma.size)) 

269 ) 

270 

271 # Set up the RNG 

272 super().__init__(lognorm, s=self.sigma, scale=np.exp(self.mu), loc=0, seed=seed) 

273 self.infimum = np.array([0.0]) 

274 self.supremum = np.array([np.inf]) 

275 

276 def _approx_equiprobable( 

277 self, N, endpoints=False, tail_N=0, tail_bound=None, tail_order=np.e 

278 ): 

279 """ 

280 Construct a discrete approximation to a lognormal distribution with underlying 

281 normal distribution N(mu,sigma). Makes an equiprobable distribution by 

282 default, but user can optionally request augmented tails with exponentially 

283 sized point masses. This can improve solution accuracy in some models. 

284 

285 Parameters 

286 ---------- 

287 N: int 

288 Number of discrete points in the "main part" of the approximation. 

289 tail_N: int 

290 Number of points in each "tail part" of the approximation; 0 = no tail. 

291 tail_bound: [float] 

292 CDF boundaries of the tails vs main portion; tail_bound[0] is the lower 

293 tail bound, tail_bound[1] is the upper tail bound. Inoperative when 

294 tail_N = 0. Can make "one tailed" approximations with 0.0 or 1.0. 

295 tail_order: float 

296 Factor by which consecutive point masses in a "tail part" differ in 

297 probability. Should be >= 1 for sensible spacing. 

298 

299 Returns 

300 ------- 

301 d : DiscreteDistribution 

302 Probability associated with each point in array of discrete 

303 points for discrete probability mass function. 

304 """ 

305 tail_bound = tail_bound or [0.02, 0.98] 

306 

307 # Handle the trivial case first 

308 if self.sigma == 0.0: 

309 pmv = np.ones(N) / N 

310 atoms = np.exp(self.mu) * np.ones(N) 

311 

312 else: 

313 # Find the CDF boundaries of each segment 

314 if tail_N > 0: 

315 lo_cut = tail_bound[0] 

316 hi_cut = tail_bound[1] 

317 else: 

318 lo_cut = 0.0 

319 hi_cut = 1.0 

320 inner_size = hi_cut - lo_cut 

321 inner_CDF_vals = [ 

322 lo_cut + x * N ** (-1.0) * inner_size for x in range(1, N) 

323 ] 

324 if inner_size < 1.0: 

325 scale = 1.0 / tail_order 

326 mag = (1.0 - scale**tail_N) / (1.0 - scale) 

327 lower_CDF_vals = [0.0] 

328 if lo_cut > 0.0: 

329 for x in range(tail_N - 1, -1, -1): 

330 lower_CDF_vals.append(lower_CDF_vals[-1] + lo_cut * scale**x / mag) 

331 upper_CDF_vals = [hi_cut] 

332 if hi_cut < 1.0: 

333 for x in range(tail_N): 

334 upper_CDF_vals.append( 

335 upper_CDF_vals[-1] + (1.0 - hi_cut) * scale**x / mag 

336 ) 

337 CDF_vals = np.array(lower_CDF_vals + inner_CDF_vals + upper_CDF_vals) 

338 CDF_vals[-1] = 1.0 

339 CDF_vals[0] = 0.0 # somehow these need fixing sometimes 

340 

341 # Calculate probability masses for each node 

342 pmv = CDF_vals[1:] - CDF_vals[:-1] 

343 pmv /= np.sum(pmv) 

344 

345 # Translate the CDF values to z-scores (stdevs from mean), then get q-scores 

346 z_cuts = norm.ppf(CDF_vals) 

347 q_cuts = (z_cuts - self.sigma) / np.sqrt(2) 

348 

349 # Evaluate the (complementary) error function at the q values 

350 erf_q = erfc(q_cuts) 

351 erf_q_neg = erfc(-q_cuts) 

352 

353 # Evaluate the base for the conditional expectations 

354 vals_base = erf_q[:-1] - erf_q[1:] 

355 these = q_cuts[:-1] < -2.0 # flag low q values and use the *other* version 

356 vals_base[these] = erf_q_neg[1:][these] - erf_q_neg[:-1][these] 

357 

358 # Make and apply the normalization factor and probability weights 

359 norm_fac = 0.5 * np.exp(self.mu + 0.5 * self.sigma**2) / pmv 

360 atoms = vals_base * norm_fac 

361 

362 if endpoints: 

363 atoms = np.r_[0.0, atoms, np.inf] 

364 pmv = np.r_[0.0, pmv, 0.0] 

365 

366 limit = { 

367 "dist": self, 

368 "method": "equiprobable", 

369 "N": N, 

370 "endpoints": endpoints, 

371 "tail_N": tail_N, 

372 "tail_bound": tail_bound, 

373 "tail_order": tail_order, 

374 } 

375 

376 return DiscreteDistribution( 

377 pmv, 

378 atoms, 

379 seed=self.random_seed(), 

380 limit=limit, 

381 ) 

382 

383 def _approx_hermite(self, N, endpoints=False): 

384 """ 

385 Returns a discrete approximation of this distribution 

386 using the Gauss-Hermite quadrature rule. 

387 

388 Parameters 

389 ---------- 

390 N : int 

391 Number of discrete points to approximate the distribution. 

392 

393 Returns 

394 ------- 

395 DiscreteDistribution 

396 Discrete approximation of this distribution. 

397 """ 

398 

399 return _hermite_discretize(self, N, endpoints, np.exp, 0.0) 

400 

401 @classmethod 

402 def from_mean_std(cls, mean, std, seed=None): 

403 """ 

404 Construct a LogNormal distribution from its 

405 mean and standard deviation. 

406 

407 This is unlike the normal constructor for the distribution, 

408 which takes the mu and sigma for the normal distribution 

409 that is the logarithm of the Log Normal distribution. 

410 

411 Parameters 

412 ---------- 

413 mean : float or [float] 

414 One or more means. Number of elements T in mu determines number 

415 of rows of output. 

416 std : float or [float] 

417 One or more standard deviations. Number of elements T in sigma 

418 determines number of rows of output. 

419 seed : int 

420 Seed for random number generator. 

421 

422 Returns 

423 --------- 

424 LogNormal 

425 

426 """ 

427 mean_squared = mean**2 

428 variance = std**2 

429 mu = np.log(mean / (np.sqrt(1.0 + variance / mean_squared))) 

430 sigma = np.sqrt(np.log(1.0 + variance / mean_squared)) 

431 

432 return cls(mu=mu, sigma=sigma, seed=seed) 

433 

434 

435LogNormal = Lognormal 

436 

437 

438class MeanOneLogNormal(Lognormal): 

439 """ 

440 A Lognormal distribution with mean 1. 

441 """ 

442 

443 def __init__(self, sigma=1.0, seed=0): 

444 mu = -0.5 * sigma**2 

445 super().__init__(mu=mu, sigma=sigma, seed=seed) 

446 

447 

448class Uniform(ContinuousFrozenDistribution): 

449 """ 

450 A Uniform distribution. 

451 

452 Parameters 

453 ---------- 

454 bot : float or [float] 

455 One or more bottom values. 

456 Number of elements T in mu determines number 

457 of rows of output. 

458 top : float or [float] 

459 One or more top values. 

460 Number of elements T in top determines number of 

461 rows of output. 

462 seed : int 

463 Seed for random number generator. 

464 """ 

465 

466 def __init__(self, bot=0.0, top=1.0, seed=None): 

467 self.bot = np.asarray(bot) 

468 self.top = np.asarray(top) 

469 

470 super().__init__(uniform, loc=self.bot, scale=self.top - self.bot, seed=seed) 

471 self.infimum = np.array([0.0]) 

472 self.supremum = np.array([np.inf]) 

473 

474 def _approx_equiprobable(self, N, endpoints=False): 

475 """ 

476 Makes a discrete approximation to this uniform distribution. 

477 

478 Parameters 

479 ---------- 

480 N : int 

481 The number of points in the discrete approximation. 

482 endpoints : bool 

483 Whether to include the endpoints in the approximation. 

484 

485 Returns 

486 ------- 

487 d : DiscreteDistribution 

488 Probability associated with each point in array of discrete 

489 points for discrete probability mass function. 

490 """ 

491 pmv = np.ones(N) / float(N) 

492 

493 center = (self.top + self.bot) / 2.0 

494 width = (self.top - self.bot) / 2.0 

495 atoms = center + width * np.linspace(-(N - 1.0) / 2.0, (N - 1.0) / 2.0, N) / ( 

496 N / 2.0 

497 ) 

498 

499 if endpoints: # insert endpoints with infinitesimally small mass 

500 atoms = np.concatenate(([self.bot], atoms, [self.top])) 

501 pmv = np.concatenate(([0.0], pmv, [0.0])) 

502 

503 limit = { 

504 "dist": self, 

505 "method": "equiprobable", 

506 "N": N, 

507 "endpoints": endpoints, 

508 } 

509 

510 return DiscreteDistribution( 

511 pmv, 

512 atoms, 

513 seed=self.random_seed(), 

514 limit=limit, 

515 ) 

516 

517 

518class Weibull(ContinuousFrozenDistribution): 

519 """ 

520 A Weibull distribution. 

521 

522 Parameters 

523 ---------- 

524 scale : float or [float] 

525 One or more scales. Number of elements T in scale 

526 determines number of 

527 rows of output. 

528 shape : float or [float] 

529 One or more shape parameters. Number of elements T in scale 

530 determines number of rows of output. 

531 seed : int 

532 Seed for random number generator. 

533 """ 

534 

535 def __init__(self, scale=1.0, shape=1.0, seed=None): 

536 self.scale = np.asarray(scale) 

537 self.shape = np.asarray(shape) 

538 

539 # Set up the RNG 

540 super().__init__(weibull_min, c=shape, scale=scale, seed=seed) 

541 self.infimum = np.array([0.0]) 

542 self.supremum = np.array([np.inf])