Coverage for HARK / distributions / base.py: 91%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-25 05:22 +0000

1from typing import Any, Optional 

2 

3import numpy as np 

4from numpy import random 

5 

6MAX_INT32 = 2**31 - 1 

7 

8 

9def random_seed(): 

10 """ 

11 Generate a random seed for use in random number generation. This random seed 

12 is derived from the system clock and other variables, and is therefore 

13 different every time the code is run. 

14 For discussion on random number generation and random seeds, see 

15 https://docs.scipy.org/doc/scipy/tutorial/stats.html#random-number-generation 

16 Parameters 

17 ---------- 

18 None 

19 Returns 

20 ------- 

21 seed : int 

22 Random seed. 

23 """ 

24 return random.SeedSequence().entropy 

25 

26 

27class Distribution: 

28 """ 

29 Base class for all probability distributions 

30 with seed and random number generator. 

31 

32 Parameters 

33 ---------- 

34 seed : Optional[int] 

35 Seed for random number generator. 

36 """ 

37 

38 def __init__(self, seed: Optional[int] = None) -> None: 

39 """ 

40 Generic distribution class with seed management. 

41 

42 Parameters 

43 ---------- 

44 seed : Optional[int], optional 

45 Seed for random number generator, by default None 

46 generates random seed based on entropy. 

47 

48 Raises 

49 ------ 

50 ValueError 

51 Seed must be an integer type. 

52 """ 

53 self.seed = seed 

54 

55 # Bounds of distribution support should be overwritten by subclasses 

56 self.infimum = np.array([]) 

57 self.supremum = np.array([]) 

58 

59 @property 

60 def seed(self) -> int: 

61 """ 

62 Seed for random number generator. 

63 

64 Returns 

65 ------- 

66 int 

67 Seed. 

68 """ 

69 return self._seed 

70 

71 @seed.setter 

72 def seed(self, seed: int) -> None: 

73 """ 

74 Set seed for random number generator. 

75 

76 Parameters 

77 ---------- 

78 seed : int 

79 Seed for random number generator. 

80 """ 

81 

82 if seed is None: 

83 # random seed from entropy 

84 self._seed = random_seed() 

85 elif isinstance(seed, (int, np.integer)): 

86 self._seed = seed 

87 else: 

88 raise ValueError("seed must be an integer") 

89 

90 # set random number generator with seed 

91 self._rng = random.default_rng(self._seed) 

92 

93 def reset(self) -> None: 

94 """ 

95 Reset the random number generator of this distribution. 

96 Resetting the seed will result in the same sequence of 

97 random numbers being generated. 

98 

99 Parameters 

100 ---------- 

101 """ 

102 self._rng = random.default_rng(self.seed) 

103 

104 def random_seed(self) -> int: 

105 """ 

106 Generate a new random seed derived from the random seed in this distribution. 

107 """ 

108 return self._rng.integers(0, MAX_INT32, dtype=np.int32) 

109 

110 def draw(self, N: int) -> np.ndarray: 

111 """ 

112 Generate arrays of draws from this distribution. 

113 If input N is a number, output is a length N array of draws from the 

114 distribution. If N is a list, output is a length T list whose 

115 t-th entry is a length N array of draws from the distribution[t]. 

116 

117 Parameters 

118 ---------- 

119 N : int 

120 Number of draws in each row. 

121 

122 Returns: 

123 ------------ 

124 draws : np.array or [np.array] 

125 T-length list of arrays of random variable draws each of size n, or 

126 a single array of size N (if sigma is a scalar). 

127 """ 

128 return self.rvs(size=N, random_state=self._rng).T 

129 

130 def discretize( 

131 self, N: int, method: str = "equiprobable", endpoints: bool = False, **kwds: Any 

132 ) -> "DiscreteDistribution": 

133 """ 

134 Discretize the distribution into N points using the specified method. 

135 

136 Parameters 

137 ---------- 

138 N : int 

139 Number of points in the discretization. 

140 method : str, optional 

141 Method for discretization, by default "equiprobable" 

142 endpoints : bool, optional 

143 Whether to include endpoints in the discretization, by default False 

144 

145 Returns 

146 ------- 

147 discretized_dstn : DiscreteDistribution 

148 Discretized distribution. 

149 

150 Raises 

151 ------ 

152 NotImplementedError 

153 If method is not implemented for this distribution. 

154 """ 

155 

156 approx_method = "_approx_" + method 

157 

158 if not hasattr(self, approx_method): 

159 raise NotImplementedError( 

160 "discretize() with method = {} not implemented for {} class".format( 

161 method, self.__class__.__name__ 

162 ) 

163 ) 

164 

165 approx = getattr(self, approx_method) 

166 discretized_dstn = approx(N, endpoints, **kwds) 

167 discretized_dstn.limit["infimum"] = self.infimum.copy() 

168 discretized_dstn.limit["supremum"] = self.supremum.copy() 

169 return discretized_dstn 

170 

171 

172class MarkovProcess(Distribution): 

173 """ 

174 A representation of a discrete Markov process. 

175 

176 Parameters 

177 ---------- 

178 transition_matrix : np.array 

179 An array of floats representing a probability mass for 

180 each state transition. 

181 seed : int 

182 Seed for random number generator. 

183 

184 """ 

185 

186 transition_matrix = None 

187 

188 def __init__(self, transition_matrix, seed=0): 

189 """ 

190 Initialize a discrete distribution. 

191 

192 """ 

193 self.transition_matrix = transition_matrix 

194 

195 # Set up the RNG 

196 super().__init__(seed) 

197 

198 def draw(self, state): 

199 """ 

200 Draw new states fromt the transition matrix. 

201 

202 Parameters 

203 ---------- 

204 state : int or nd.array 

205 The state or states (1-D array) from which to draw new states. 

206 

207 Returns 

208 ------- 

209 new_state : int or nd.array 

210 New states. 

211 """ 

212 

213 def sample(s): 

214 return self._rng.choice( 

215 self.transition_matrix.shape[1], p=self.transition_matrix[s, :] 

216 ) 

217 

218 array_sample = np.frompyfunc(sample, 1, 1) 

219 

220 return array_sample(state) 

221 

222 

223class IndexDistribution(Distribution): 

224 """ 

225 This class provides a way to define a distribution that 

226 is conditional on an index. 

227 

228 The current implementation combines a defined distribution 

229 class (such as Bernoulli, LogNormal, etc.) with information 

230 about the conditions on the parameters of the distribution. 

231 

232 It can also wrap a list of pre-discretized distributions (previously 

233 provided by TimeVaryingDiscreteDistribution) and provide the same API. 

234 

235 Parameters 

236 ---------- 

237 

238 engine : Distribution class 

239 A Distribution subclass. 

240 

241 conditional: dict 

242 Information about the conditional variation on the input parameters of the engine 

243 distribution. Keys should match the arguments to the engine class constructor. 

244 

245 distributions: [DiscreteDistribution] 

246 Optional. A list of discrete distributions to wrap directly. 

247 

248 seed : int 

249 Seed for random number generator. 

250 """ 

251 

252 conditional = None 

253 engine = None 

254 

255 def __init__( 

256 self, engine=None, conditional=None, distributions=None, RNG=None, seed=None 

257 ): 

258 if RNG is None: 

259 # Set up the RNG 

260 super().__init__(seed) 

261 else: 

262 # If an RNG is received, use it in whatever state it is in. 

263 self._rng = RNG 

264 # The seed will still be set, even if it is not used for the RNG, 

265 # for whenever self.reset() is called. 

266 # Note that self.reset() will stop using the RNG that was passed 

267 # and create a new one. 

268 self.seed = seed 

269 

270 # Mode 1: wrapping a list of discrete distributions 

271 if distributions is not None: 

272 self.distributions = distributions 

273 self.engine = None 

274 self.conditional = None 

275 self.dstns = [] 

276 return 

277 

278 # Mode 2: engine + conditional parameters (original IndexDistribution) 

279 self.conditional = conditional if conditional is not None else {} 

280 self.engine = engine 

281 

282 self.dstns = [] 

283 

284 # If no engine/conditional were provided, this is an invalid state. 

285 if self.engine is None and not self.conditional: 

286 raise ValueError( 

287 "MarkovProcess: No engine or conditional parameters provided; this should not happen in normal use." 

288 ) 

289 

290 # Test one item to determine case handling 

291 item0 = list(self.conditional.values())[0] 

292 

293 if type(item0) is list: 

294 # Create and store all the conditional distributions 

295 for y in range(len(item0)): 

296 cond = {key: val[y] for (key, val) in self.conditional.items()} 

297 self.dstns.append(self.engine(seed=self.random_seed(), **cond)) 

298 

299 elif type(item0) is float: 

300 self.dstns = [self.engine(seed=self.random_seed(), **self.conditional)] 

301 

302 else: 

303 raise ( 

304 Exception( 

305 f"IndexDistribution: Unhandled case for __getitem__ access. item0: {item0}; conditional: {self.conditional}" 

306 ) 

307 ) 

308 

309 def __getitem__(self, y): 

310 # Prefer discrete list mode if present 

311 if hasattr(self, "distributions") and self.distributions: 

312 return self.distributions[y] 

313 return self.dstns[y] 

314 

315 def reset(self): 

316 # Reset the main RNG and each member distribution 

317 super().reset() 

318 for d in self.dstns: 

319 d.reset() 

320 

321 def discretize(self, N, **kwds): 

322 """ 

323 Approximation of the distribution. 

324 

325 Parameters 

326 ---------- 

327 N : init 

328 Number of discrete points to approximate 

329 continuous distribution into. 

330 

331 kwds: dict 

332 Other keyword arguments passed to engine 

333 distribution approx() method. 

334 

335 Returns: 

336 ------------ 

337 dists : [DiscreteDistribution] or IndexDistribution 

338 If parameterization is constant, returns a single DiscreteDistribution. 

339 If parameterization varies with index, returns an IndexDistribution in 

340 discrete-list mode, wrapping the corresponding discrete distributions. 

341 """ 

342 

343 # If already in discrete list mode, return self (already discretized) 

344 if hasattr(self, "distributions") and self.distributions: 

345 return self 

346 

347 # test one item to determine case handling 

348 item0 = list(self.conditional.values())[0] 

349 

350 if type(item0) is float: 

351 # degenerate case. Treat the parameterization as constant. 

352 return self.dstns[0].discretize(N, **kwds) 

353 

354 if type(item0) is list: 

355 # Return an IndexDistribution wrapping a list of discrete distributions 

356 return IndexDistribution( 

357 distributions=[ 

358 self[i].discretize(N, **kwds) for i, _ in enumerate(item0) 

359 ], 

360 seed=self.seed, 

361 ) 

362 

363 def draw(self, condition): 

364 """ 

365 Generate arrays of draws. 

366 The input is an array containing the conditions. 

367 The output is an array of the same length (axis 1 dimension) 

368 as the conditions containing random draws of the conditional 

369 distribution. 

370 

371 Parameters 

372 ---------- 

373 condition : np.array 

374 The input conditions to the distribution. 

375 

376 Returns: 

377 ------------ 

378 draws : np.array 

379 """ 

380 # for now, assume that all the conditionals 

381 # are of the same type. 

382 # this matches the HARK 'time-varying' model architecture. 

383 

384 # If wrapping discrete distributions, draw from those 

385 if hasattr(self, "distributions") and self.distributions: 

386 draws = np.zeros(condition.size) 

387 for c in np.unique(condition): 

388 these = c == condition 

389 N = np.sum(these) 

390 draws[these] = self.distributions[c].draw(N) 

391 return draws 

392 

393 # test one item to determine case handling 

394 item0 = list(self.conditional.values())[0] 

395 

396 if type(item0) is float: 

397 # degenerate case. Treat the parameterization as constant. 

398 N = condition.size 

399 

400 return self.engine(seed=self.random_seed(), **self.conditional).draw(N) 

401 

402 if type(item0) is list: 

403 # conditions are indices into list 

404 # somewhat convoluted sampling strategy retained 

405 # for test backwards compatibility 

406 

407 draws = np.zeros(condition.size) 

408 

409 for c in np.unique(condition): 

410 these = c == condition 

411 N = np.sum(these) 

412 

413 draws[these] = self[c].draw(N) 

414 

415 return draws