Coverage for HARK/distributions/base.py: 89%

108 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-02 05:14 +0000

1from typing import Any, Optional 

2 

3import numpy as np 

4from numpy import random 

5 

6 

7class Distribution: 

8 """ 

9 Base class for all probability distributions 

10 with seed and random number generator. 

11 

12 For discussion on random number generation and random seeds, see 

13 https://docs.scipy.org/doc/scipy/tutorial/stats.html#random-number-generation 

14 

15 Parameters 

16 ---------- 

17 seed : Optional[int] 

18 Seed for random number generator. 

19 """ 

20 

21 def __init__(self, seed: Optional[int] = 0) -> None: 

22 """ 

23 Generic distribution class with seed management. 

24 

25 Parameters 

26 ---------- 

27 seed : Optional[int], optional 

28 Seed for random number generator, by default None 

29 generates random seed based on entropy. 

30 

31 Raises 

32 ------ 

33 ValueError 

34 Seed must be an integer type. 

35 """ 

36 if seed is None: 

37 # generate random seed 

38 _seed: int = random.SeedSequence().entropy 

39 elif isinstance(seed, (int, np.integer)): 

40 _seed = seed 

41 else: 

42 raise ValueError("seed must be an integer") 

43 

44 self._seed: int = _seed 

45 self._rng: random.Generator = random.default_rng(self._seed) 

46 

47 # Bounds of distribution support should be overwritten by subclasses 

48 self.infimum = np.array([]) 

49 self.supremum = np.array([]) 

50 

51 @property 

52 def seed(self) -> int: 

53 """ 

54 Seed for random number generator. 

55 

56 Returns 

57 ------- 

58 int 

59 Seed. 

60 """ 

61 return self._seed # type: ignore 

62 

63 @seed.setter 

64 def seed(self, seed: int) -> None: 

65 """ 

66 Set seed for random number generator. 

67 

68 Parameters 

69 ---------- 

70 seed : int 

71 Seed for random number generator. 

72 """ 

73 

74 if isinstance(seed, (int, np.integer)): 

75 self._seed = seed 

76 self._rng = random.default_rng(seed) 

77 else: 

78 raise ValueError("seed must be an integer") 

79 

80 def reset(self) -> None: 

81 """ 

82 Reset the random number generator of this distribution. 

83 Resetting the seed will result in the same sequence of 

84 random numbers being generated. 

85 

86 Parameters 

87 ---------- 

88 """ 

89 self._rng = random.default_rng(self.seed) 

90 

91 def random_seed(self) -> None: 

92 """ 

93 Generate a new random seed for this distribution. 

94 """ 

95 self.seed = random.SeedSequence().entropy 

96 

97 def draw(self, N: int) -> np.ndarray: 

98 """ 

99 Generate arrays of draws from this distribution. 

100 If input N is a number, output is a length N array of draws from the 

101 distribution. If N is a list, output is a length T list whose 

102 t-th entry is a length N array of draws from the distribution[t]. 

103 

104 Parameters 

105 ---------- 

106 N : int 

107 Number of draws in each row. 

108 

109 Returns: 

110 ------------ 

111 draws : np.array or [np.array] 

112 T-length list of arrays of random variable draws each of size n, or 

113 a single array of size N (if sigma is a scalar). 

114 """ 

115 

116 mean = self.mean() if callable(self.mean) else self.mean 

117 size = (N, mean.size) if mean.size != 1 else N 

118 return self.rvs(size=size, random_state=self._rng) 

119 

120 def discretize( 

121 self, N: int, method: str = "equiprobable", endpoints: bool = False, **kwds: Any 

122 ) -> "DiscreteDistribution": 

123 """ 

124 Discretize the distribution into N points using the specified method. 

125 

126 Parameters 

127 ---------- 

128 N : int 

129 Number of points in the discretization. 

130 method : str, optional 

131 Method for discretization, by default "equiprobable" 

132 endpoints : bool, optional 

133 Whether to include endpoints in the discretization, by default False 

134 

135 Returns 

136 ------- 

137 discretized_dstn : DiscreteDistribution 

138 Discretized distribution. 

139 

140 Raises 

141 ------ 

142 NotImplementedError 

143 If method is not implemented for this distribution. 

144 """ 

145 

146 approx_method = "_approx_" + method 

147 

148 if not hasattr(self, approx_method): 

149 raise NotImplementedError( 

150 "discretize() with method = {} not implemented for {} class".format( 

151 method, self.__class__.__name__ 

152 ) 

153 ) 

154 

155 approx = getattr(self, approx_method) 

156 discretized_dstn = approx(N, endpoints, **kwds) 

157 discretized_dstn.limit["infimum"] = self.infimum.copy() 

158 discretized_dstn.limit["supremum"] = self.supremum.copy() 

159 return discretized_dstn 

160 

161 

162class MarkovProcess(Distribution): 

163 """ 

164 A representation of a discrete Markov process. 

165 

166 Parameters 

167 ---------- 

168 transition_matrix : np.array 

169 An array of floats representing a probability mass for 

170 each state transition. 

171 seed : int 

172 Seed for random number generator. 

173 

174 """ 

175 

176 transition_matrix = None 

177 

178 def __init__(self, transition_matrix, seed=0): 

179 """ 

180 Initialize a discrete distribution. 

181 

182 """ 

183 self.transition_matrix = transition_matrix 

184 

185 # Set up the RNG 

186 super().__init__(seed) 

187 

188 def draw(self, state): 

189 """ 

190 Draw new states fromt the transition matrix. 

191 

192 Parameters 

193 ---------- 

194 state : int or nd.array 

195 The state or states (1-D array) from which to draw new states. 

196 

197 Returns 

198 ------- 

199 new_state : int or nd.array 

200 New states. 

201 """ 

202 

203 def sample(s): 

204 return self._rng.choice( 

205 self.transition_matrix.shape[1], p=self.transition_matrix[s, :] 

206 ) 

207 

208 array_sample = np.frompyfunc(sample, 1, 1) 

209 

210 return array_sample(state) 

211 

212 

213class IndexDistribution(Distribution): 

214 """ 

215 This class provides a way to define a distribution that 

216 is conditional on an index. 

217 

218 The current implementation combines a defined distribution 

219 class (such as Bernoulli, LogNormal, etc.) with information 

220 about the conditions on the parameters of the distribution. 

221 

222 It can also wrap a list of pre-discretized distributions (previously 

223 provided by TimeVaryingDiscreteDistribution) and provide the same API. 

224 

225 Parameters 

226 ---------- 

227 

228 engine : Distribution class 

229 A Distribution subclass. 

230 

231 conditional: dict 

232 Information about the conditional variation 

233 on the input parameters of the engine distribution. 

234 Keys should match the arguments to the engine class 

235 constructor. 

236 

237 distributions: [DiscreteDistribution] 

238 Optional. A list of discrete distributions to wrap directly. 

239 

240 seed : int 

241 Seed for random number generator. 

242 """ 

243 

244 conditional = None 

245 engine = None 

246 

247 def __init__( 

248 self, engine=None, conditional=None, distributions=None, RNG=None, seed=0 

249 ): 

250 if RNG is None: 

251 # Set up the RNG 

252 super().__init__(seed) 

253 else: 

254 # If an RNG is received, use it in whatever state it is in. 

255 self._rng = RNG 

256 # The seed will still be set, even if it is not used for the RNG, 

257 # for whenever self.reset() is called. 

258 # Note that self.reset() will stop using the RNG that was passed 

259 # and create a new one. 

260 self.seed = seed 

261 

262 # Mode 1: wrapping a list of discrete distributions 

263 if distributions is not None: 

264 self.distributions = distributions 

265 self.engine = None 

266 self.conditional = None 

267 self.dstns = [] 

268 return 

269 

270 # Mode 2: engine + conditional parameters (original IndexDistribution) 

271 self.conditional = conditional if conditional is not None else {} 

272 self.engine = engine 

273 

274 self.dstns = [] 

275 

276 # If no engine/conditional were provided, this is an invalid state. 

277 if self.engine is None and not self.conditional: 

278 raise ValueError( 

279 "MarkovProcess: No engine or conditional parameters provided; this should not happen in normal use." 

280 ) 

281 

282 # Test one item to determine case handling 

283 item0 = list(self.conditional.values())[0] 

284 

285 if type(item0) is list: 

286 # Create and store all the conditional distributions 

287 for y in range(len(item0)): 

288 cond = {key: val[y] for (key, val) in self.conditional.items()} 

289 self.dstns.append( 

290 self.engine(seed=self._rng.integers(0, 2**31 - 1), **cond) 

291 ) 

292 

293 elif type(item0) is float: 

294 self.dstns = [ 

295 self.engine(seed=self._rng.integers(0, 2**31 - 1), **self.conditional) 

296 ] 

297 

298 else: 

299 raise ( 

300 Exception( 

301 f"IndexDistribution: Unhandled case for __getitem__ access. item0: {item0}; conditional: {self.conditional}" 

302 ) 

303 ) 

304 

305 def __getitem__(self, y): 

306 # Prefer discrete list mode if present 

307 if hasattr(self, "distributions") and self.distributions: 

308 return self.distributions[y] 

309 return self.dstns[y] 

310 

311 def discretize(self, N, **kwds): 

312 """ 

313 Approximation of the distribution. 

314 

315 Parameters 

316 ---------- 

317 N : init 

318 Number of discrete points to approximate 

319 continuous distribution into. 

320 

321 kwds: dict 

322 Other keyword arguments passed to engine 

323 distribution approx() method. 

324 

325 Returns: 

326 ------------ 

327 dists : [DiscreteDistribution] or IndexDistribution 

328 If parameterization is constant, returns a single DiscreteDistribution. 

329 If parameterization varies with index, returns an IndexDistribution in 

330 discrete-list mode, wrapping the corresponding discrete distributions. 

331 """ 

332 

333 # If already in discrete list mode, return self (already discretized) 

334 if hasattr(self, "distributions") and self.distributions: 

335 return self 

336 

337 # test one item to determine case handling 

338 item0 = list(self.conditional.values())[0] 

339 

340 if type(item0) is float: 

341 # degenerate case. Treat the parameterization as constant. 

342 return self.dstns[0].discretize(N, **kwds) 

343 

344 if type(item0) is list: 

345 # Return an IndexDistribution wrapping a list of discrete distributions 

346 return IndexDistribution( 

347 distributions=[ 

348 self[i].discretize(N, **kwds) for i, _ in enumerate(item0) 

349 ], 

350 seed=self.seed, 

351 ) 

352 

353 def draw(self, condition): 

354 """ 

355 Generate arrays of draws. 

356 The input is an array containing the conditions. 

357 The output is an array of the same length (axis 1 dimension) 

358 as the conditions containing random draws of the conditional 

359 distribution. 

360 

361 Parameters 

362 ---------- 

363 condition : np.array 

364 The input conditions to the distribution. 

365 

366 Returns: 

367 ------------ 

368 draws : np.array 

369 """ 

370 # for now, assume that all the conditionals 

371 # are of the same type. 

372 # this matches the HARK 'time-varying' model architecture. 

373 

374 # If wrapping discrete distributions, draw from those 

375 if hasattr(self, "distributions") and self.distributions: 

376 draws = np.zeros(condition.size) 

377 for c in np.unique(condition): 

378 these = c == condition 

379 N = np.sum(these) 

380 draws[these] = self.distributions[c].draw(N) 

381 return draws 

382 

383 # test one item to determine case handling 

384 item0 = list(self.conditional.values())[0] 

385 

386 if type(item0) is float: 

387 # degenerate case. Treat the parameterization as constant. 

388 N = condition.size 

389 

390 return self.engine( 

391 seed=self._rng.integers(0, 2**31 - 1), **self.conditional 

392 ).draw(N) 

393 

394 if type(item0) is list: 

395 # conditions are indices into list 

396 # somewhat convoluted sampling strategy retained 

397 # for test backwards compatibility 

398 

399 draws = np.zeros(condition.size) 

400 

401 for c in np.unique(condition): 

402 these = c == condition 

403 N = np.sum(these) 

404 

405 draws[these] = self[c].draw(N) 

406 

407 return draws