Coverage for HARK / distributions / base.py: 89%

110 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-07 05:16 +0000

1from typing import Any, Optional 

2 

3import numpy as np 

4from numpy import random 

5 

6 

7class Distribution: 

8 """ 

9 Base class for all probability distributions 

10 with seed and random number generator. 

11 

12 For discussion on random number generation and random seeds, see 

13 https://docs.scipy.org/doc/scipy/tutorial/stats.html#random-number-generation 

14 

15 Parameters 

16 ---------- 

17 seed : Optional[int] 

18 Seed for random number generator. 

19 """ 

20 

21 def __init__(self, seed: Optional[int] = 0) -> None: 

22 """ 

23 Generic distribution class with seed management. 

24 

25 Parameters 

26 ---------- 

27 seed : Optional[int], optional 

28 Seed for random number generator, by default None 

29 generates random seed based on entropy. 

30 

31 Raises 

32 ------ 

33 ValueError 

34 Seed must be an integer type. 

35 """ 

36 if seed is None: 

37 # generate random seed 

38 _seed: int = random.SeedSequence().entropy 

39 elif isinstance(seed, (int, np.integer)): 

40 _seed = seed 

41 else: 

42 raise ValueError("seed must be an integer") 

43 

44 self._seed: int = _seed 

45 self._rng: random.Generator = random.default_rng(self._seed) 

46 

47 # Bounds of distribution support should be overwritten by subclasses 

48 self.infimum = np.array([]) 

49 self.supremum = np.array([]) 

50 

51 @property 

52 def seed(self) -> int: 

53 """ 

54 Seed for random number generator. 

55 

56 Returns 

57 ------- 

58 int 

59 Seed. 

60 """ 

61 return self._seed # type: ignore 

62 

63 @seed.setter 

64 def seed(self, seed: int) -> None: 

65 """ 

66 Set seed for random number generator. 

67 

68 Parameters 

69 ---------- 

70 seed : int 

71 Seed for random number generator. 

72 """ 

73 

74 if isinstance(seed, (int, np.integer)): 

75 self._seed = seed 

76 self._rng = random.default_rng(seed) 

77 else: 

78 raise ValueError("seed must be an integer") 

79 

80 def reset(self) -> None: 

81 """ 

82 Reset the random number generator of this distribution. 

83 Resetting the seed will result in the same sequence of 

84 random numbers being generated. 

85 

86 Parameters 

87 ---------- 

88 """ 

89 self._rng = random.default_rng(self.seed) 

90 

91 def random_seed(self) -> None: 

92 """ 

93 Generate a new random seed for this distribution. 

94 """ 

95 self.seed = random.SeedSequence().entropy 

96 

97 def draw(self, N: int) -> np.ndarray: 

98 """ 

99 Generate arrays of draws from this distribution. 

100 If input N is a number, output is a length N array of draws from the 

101 distribution. If N is a list, output is a length T list whose 

102 t-th entry is a length N array of draws from the distribution[t]. 

103 

104 Parameters 

105 ---------- 

106 N : int 

107 Number of draws in each row. 

108 

109 Returns: 

110 ------------ 

111 draws : np.array or [np.array] 

112 T-length list of arrays of random variable draws each of size n, or 

113 a single array of size N (if sigma is a scalar). 

114 """ 

115 return self.rvs(size=N, random_state=self._rng).T 

116 

117 def discretize( 

118 self, N: int, method: str = "equiprobable", endpoints: bool = False, **kwds: Any 

119 ) -> "DiscreteDistribution": 

120 """ 

121 Discretize the distribution into N points using the specified method. 

122 

123 Parameters 

124 ---------- 

125 N : int 

126 Number of points in the discretization. 

127 method : str, optional 

128 Method for discretization, by default "equiprobable" 

129 endpoints : bool, optional 

130 Whether to include endpoints in the discretization, by default False 

131 

132 Returns 

133 ------- 

134 discretized_dstn : DiscreteDistribution 

135 Discretized distribution. 

136 

137 Raises 

138 ------ 

139 NotImplementedError 

140 If method is not implemented for this distribution. 

141 """ 

142 

143 approx_method = "_approx_" + method 

144 

145 if not hasattr(self, approx_method): 

146 raise NotImplementedError( 

147 "discretize() with method = {} not implemented for {} class".format( 

148 method, self.__class__.__name__ 

149 ) 

150 ) 

151 

152 approx = getattr(self, approx_method) 

153 discretized_dstn = approx(N, endpoints, **kwds) 

154 discretized_dstn.limit["infimum"] = self.infimum.copy() 

155 discretized_dstn.limit["supremum"] = self.supremum.copy() 

156 return discretized_dstn 

157 

158 

159class MarkovProcess(Distribution): 

160 """ 

161 A representation of a discrete Markov process. 

162 

163 Parameters 

164 ---------- 

165 transition_matrix : np.array 

166 An array of floats representing a probability mass for 

167 each state transition. 

168 seed : int 

169 Seed for random number generator. 

170 

171 """ 

172 

173 transition_matrix = None 

174 

175 def __init__(self, transition_matrix, seed=0): 

176 """ 

177 Initialize a discrete distribution. 

178 

179 """ 

180 self.transition_matrix = transition_matrix 

181 

182 # Set up the RNG 

183 super().__init__(seed) 

184 

185 def draw(self, state): 

186 """ 

187 Draw new states fromt the transition matrix. 

188 

189 Parameters 

190 ---------- 

191 state : int or nd.array 

192 The state or states (1-D array) from which to draw new states. 

193 

194 Returns 

195 ------- 

196 new_state : int or nd.array 

197 New states. 

198 """ 

199 

200 def sample(s): 

201 return self._rng.choice( 

202 self.transition_matrix.shape[1], p=self.transition_matrix[s, :] 

203 ) 

204 

205 array_sample = np.frompyfunc(sample, 1, 1) 

206 

207 return array_sample(state) 

208 

209 

210class IndexDistribution(Distribution): 

211 """ 

212 This class provides a way to define a distribution that 

213 is conditional on an index. 

214 

215 The current implementation combines a defined distribution 

216 class (such as Bernoulli, LogNormal, etc.) with information 

217 about the conditions on the parameters of the distribution. 

218 

219 It can also wrap a list of pre-discretized distributions (previously 

220 provided by TimeVaryingDiscreteDistribution) and provide the same API. 

221 

222 Parameters 

223 ---------- 

224 

225 engine : Distribution class 

226 A Distribution subclass. 

227 

228 conditional: dict 

229 Information about the conditional variation on the input parameters of the engine 

230 distribution. Keys should match the arguments to the engine class constructor. 

231 

232 distributions: [DiscreteDistribution] 

233 Optional. A list of discrete distributions to wrap directly. 

234 

235 seed : int 

236 Seed for random number generator. 

237 """ 

238 

239 conditional = None 

240 engine = None 

241 

242 def __init__( 

243 self, engine=None, conditional=None, distributions=None, RNG=None, seed=0 

244 ): 

245 if RNG is None: 

246 # Set up the RNG 

247 super().__init__(seed) 

248 else: 

249 # If an RNG is received, use it in whatever state it is in. 

250 self._rng = RNG 

251 # The seed will still be set, even if it is not used for the RNG, 

252 # for whenever self.reset() is called. 

253 # Note that self.reset() will stop using the RNG that was passed 

254 # and create a new one. 

255 self.seed = seed 

256 

257 # Mode 1: wrapping a list of discrete distributions 

258 if distributions is not None: 

259 self.distributions = distributions 

260 self.engine = None 

261 self.conditional = None 

262 self.dstns = [] 

263 return 

264 

265 # Mode 2: engine + conditional parameters (original IndexDistribution) 

266 self.conditional = conditional if conditional is not None else {} 

267 self.engine = engine 

268 

269 self.dstns = [] 

270 

271 # If no engine/conditional were provided, this is an invalid state. 

272 if self.engine is None and not self.conditional: 

273 raise ValueError( 

274 "MarkovProcess: No engine or conditional parameters provided; this should not happen in normal use." 

275 ) 

276 

277 # Test one item to determine case handling 

278 item0 = list(self.conditional.values())[0] 

279 

280 if type(item0) is list: 

281 # Create and store all the conditional distributions 

282 for y in range(len(item0)): 

283 cond = {key: val[y] for (key, val) in self.conditional.items()} 

284 self.dstns.append( 

285 self.engine(seed=self._rng.integers(0, 2**31 - 1), **cond) 

286 ) 

287 

288 elif type(item0) is float: 

289 self.dstns = [ 

290 self.engine(seed=self._rng.integers(0, 2**31 - 1), **self.conditional) 

291 ] 

292 

293 else: 

294 raise ( 

295 Exception( 

296 f"IndexDistribution: Unhandled case for __getitem__ access. item0: {item0}; conditional: {self.conditional}" 

297 ) 

298 ) 

299 

300 def __getitem__(self, y): 

301 # Prefer discrete list mode if present 

302 if hasattr(self, "distributions") and self.distributions: 

303 return self.distributions[y] 

304 return self.dstns[y] 

305 

306 def reset(self): 

307 # Reset the main RNG and each member distribution 

308 super().reset() 

309 for d in self.dstns: 

310 d.reset() 

311 

312 def discretize(self, N, **kwds): 

313 """ 

314 Approximation of the distribution. 

315 

316 Parameters 

317 ---------- 

318 N : init 

319 Number of discrete points to approximate 

320 continuous distribution into. 

321 

322 kwds: dict 

323 Other keyword arguments passed to engine 

324 distribution approx() method. 

325 

326 Returns: 

327 ------------ 

328 dists : [DiscreteDistribution] or IndexDistribution 

329 If parameterization is constant, returns a single DiscreteDistribution. 

330 If parameterization varies with index, returns an IndexDistribution in 

331 discrete-list mode, wrapping the corresponding discrete distributions. 

332 """ 

333 

334 # If already in discrete list mode, return self (already discretized) 

335 if hasattr(self, "distributions") and self.distributions: 

336 return self 

337 

338 # test one item to determine case handling 

339 item0 = list(self.conditional.values())[0] 

340 

341 if type(item0) is float: 

342 # degenerate case. Treat the parameterization as constant. 

343 return self.dstns[0].discretize(N, **kwds) 

344 

345 if type(item0) is list: 

346 # Return an IndexDistribution wrapping a list of discrete distributions 

347 return IndexDistribution( 

348 distributions=[ 

349 self[i].discretize(N, **kwds) for i, _ in enumerate(item0) 

350 ], 

351 seed=self.seed, 

352 ) 

353 

354 def draw(self, condition): 

355 """ 

356 Generate arrays of draws. 

357 The input is an array containing the conditions. 

358 The output is an array of the same length (axis 1 dimension) 

359 as the conditions containing random draws of the conditional 

360 distribution. 

361 

362 Parameters 

363 ---------- 

364 condition : np.array 

365 The input conditions to the distribution. 

366 

367 Returns: 

368 ------------ 

369 draws : np.array 

370 """ 

371 # for now, assume that all the conditionals 

372 # are of the same type. 

373 # this matches the HARK 'time-varying' model architecture. 

374 

375 # If wrapping discrete distributions, draw from those 

376 if hasattr(self, "distributions") and self.distributions: 

377 draws = np.zeros(condition.size) 

378 for c in np.unique(condition): 

379 these = c == condition 

380 N = np.sum(these) 

381 draws[these] = self.distributions[c].draw(N) 

382 return draws 

383 

384 # test one item to determine case handling 

385 item0 = list(self.conditional.values())[0] 

386 

387 if type(item0) is float: 

388 # degenerate case. Treat the parameterization as constant. 

389 N = condition.size 

390 

391 return self.engine( 

392 seed=self._rng.integers(0, 2**31 - 1), **self.conditional 

393 ).draw(N) 

394 

395 if type(item0) is list: 

396 # conditions are indices into list 

397 # somewhat convoluted sampling strategy retained 

398 # for test backwards compatibility 

399 

400 draws = np.zeros(condition.size) 

401 

402 for c in np.unique(condition): 

403 these = c == condition 

404 N = np.sum(these) 

405 

406 draws[these] = self[c].draw(N) 

407 

408 return draws