Coverage for HARK / distributions / base.py: 91%
109 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-25 05:22 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-25 05:22 +0000
1from typing import Any, Optional
3import numpy as np
4from numpy import random
6MAX_INT32 = 2**31 - 1
9def random_seed():
10 """
11 Generate a random seed for use in random number generation. This random seed
12 is derived from the system clock and other variables, and is therefore
13 different every time the code is run.
14 For discussion on random number generation and random seeds, see
15 https://docs.scipy.org/doc/scipy/tutorial/stats.html#random-number-generation
16 Parameters
17 ----------
18 None
19 Returns
20 -------
21 seed : int
22 Random seed.
23 """
24 return random.SeedSequence().entropy
27class Distribution:
28 """
29 Base class for all probability distributions
30 with seed and random number generator.
32 Parameters
33 ----------
34 seed : Optional[int]
35 Seed for random number generator.
36 """
38 def __init__(self, seed: Optional[int] = None) -> None:
39 """
40 Generic distribution class with seed management.
42 Parameters
43 ----------
44 seed : Optional[int], optional
45 Seed for random number generator, by default None
46 generates random seed based on entropy.
48 Raises
49 ------
50 ValueError
51 Seed must be an integer type.
52 """
53 self.seed = seed
55 # Bounds of distribution support should be overwritten by subclasses
56 self.infimum = np.array([])
57 self.supremum = np.array([])
59 @property
60 def seed(self) -> int:
61 """
62 Seed for random number generator.
64 Returns
65 -------
66 int
67 Seed.
68 """
69 return self._seed
71 @seed.setter
72 def seed(self, seed: int) -> None:
73 """
74 Set seed for random number generator.
76 Parameters
77 ----------
78 seed : int
79 Seed for random number generator.
80 """
82 if seed is None:
83 # random seed from entropy
84 self._seed = random_seed()
85 elif isinstance(seed, (int, np.integer)):
86 self._seed = seed
87 else:
88 raise ValueError("seed must be an integer")
90 # set random number generator with seed
91 self._rng = random.default_rng(self._seed)
93 def reset(self) -> None:
94 """
95 Reset the random number generator of this distribution.
96 Resetting the seed will result in the same sequence of
97 random numbers being generated.
99 Parameters
100 ----------
101 """
102 self._rng = random.default_rng(self.seed)
104 def random_seed(self) -> int:
105 """
106 Generate a new random seed derived from the random seed in this distribution.
107 """
108 return self._rng.integers(0, MAX_INT32, dtype=np.int32)
110 def draw(self, N: int) -> np.ndarray:
111 """
112 Generate arrays of draws from this distribution.
113 If input N is a number, output is a length N array of draws from the
114 distribution. If N is a list, output is a length T list whose
115 t-th entry is a length N array of draws from the distribution[t].
117 Parameters
118 ----------
119 N : int
120 Number of draws in each row.
122 Returns:
123 ------------
124 draws : np.array or [np.array]
125 T-length list of arrays of random variable draws each of size n, or
126 a single array of size N (if sigma is a scalar).
127 """
128 return self.rvs(size=N, random_state=self._rng).T
130 def discretize(
131 self, N: int, method: str = "equiprobable", endpoints: bool = False, **kwds: Any
132 ) -> "DiscreteDistribution":
133 """
134 Discretize the distribution into N points using the specified method.
136 Parameters
137 ----------
138 N : int
139 Number of points in the discretization.
140 method : str, optional
141 Method for discretization, by default "equiprobable"
142 endpoints : bool, optional
143 Whether to include endpoints in the discretization, by default False
145 Returns
146 -------
147 discretized_dstn : DiscreteDistribution
148 Discretized distribution.
150 Raises
151 ------
152 NotImplementedError
153 If method is not implemented for this distribution.
154 """
156 approx_method = "_approx_" + method
158 if not hasattr(self, approx_method):
159 raise NotImplementedError(
160 "discretize() with method = {} not implemented for {} class".format(
161 method, self.__class__.__name__
162 )
163 )
165 approx = getattr(self, approx_method)
166 discretized_dstn = approx(N, endpoints, **kwds)
167 discretized_dstn.limit["infimum"] = self.infimum.copy()
168 discretized_dstn.limit["supremum"] = self.supremum.copy()
169 return discretized_dstn
172class MarkovProcess(Distribution):
173 """
174 A representation of a discrete Markov process.
176 Parameters
177 ----------
178 transition_matrix : np.array
179 An array of floats representing a probability mass for
180 each state transition.
181 seed : int
182 Seed for random number generator.
184 """
186 transition_matrix = None
188 def __init__(self, transition_matrix, seed=0):
189 """
190 Initialize a discrete distribution.
192 """
193 self.transition_matrix = transition_matrix
195 # Set up the RNG
196 super().__init__(seed)
198 def draw(self, state):
199 """
200 Draw new states fromt the transition matrix.
202 Parameters
203 ----------
204 state : int or nd.array
205 The state or states (1-D array) from which to draw new states.
207 Returns
208 -------
209 new_state : int or nd.array
210 New states.
211 """
213 def sample(s):
214 return self._rng.choice(
215 self.transition_matrix.shape[1], p=self.transition_matrix[s, :]
216 )
218 array_sample = np.frompyfunc(sample, 1, 1)
220 return array_sample(state)
223class IndexDistribution(Distribution):
224 """
225 This class provides a way to define a distribution that
226 is conditional on an index.
228 The current implementation combines a defined distribution
229 class (such as Bernoulli, LogNormal, etc.) with information
230 about the conditions on the parameters of the distribution.
232 It can also wrap a list of pre-discretized distributions (previously
233 provided by TimeVaryingDiscreteDistribution) and provide the same API.
235 Parameters
236 ----------
238 engine : Distribution class
239 A Distribution subclass.
241 conditional: dict
242 Information about the conditional variation on the input parameters of the engine
243 distribution. Keys should match the arguments to the engine class constructor.
245 distributions: [DiscreteDistribution]
246 Optional. A list of discrete distributions to wrap directly.
248 seed : int
249 Seed for random number generator.
250 """
252 conditional = None
253 engine = None
255 def __init__(
256 self, engine=None, conditional=None, distributions=None, RNG=None, seed=None
257 ):
258 if RNG is None:
259 # Set up the RNG
260 super().__init__(seed)
261 else:
262 # If an RNG is received, use it in whatever state it is in.
263 self._rng = RNG
264 # The seed will still be set, even if it is not used for the RNG,
265 # for whenever self.reset() is called.
266 # Note that self.reset() will stop using the RNG that was passed
267 # and create a new one.
268 self.seed = seed
270 # Mode 1: wrapping a list of discrete distributions
271 if distributions is not None:
272 self.distributions = distributions
273 self.engine = None
274 self.conditional = None
275 self.dstns = []
276 return
278 # Mode 2: engine + conditional parameters (original IndexDistribution)
279 self.conditional = conditional if conditional is not None else {}
280 self.engine = engine
282 self.dstns = []
284 # If no engine/conditional were provided, this is an invalid state.
285 if self.engine is None and not self.conditional:
286 raise ValueError(
287 "MarkovProcess: No engine or conditional parameters provided; this should not happen in normal use."
288 )
290 # Test one item to determine case handling
291 item0 = list(self.conditional.values())[0]
293 if type(item0) is list:
294 # Create and store all the conditional distributions
295 for y in range(len(item0)):
296 cond = {key: val[y] for (key, val) in self.conditional.items()}
297 self.dstns.append(self.engine(seed=self.random_seed(), **cond))
299 elif type(item0) is float:
300 self.dstns = [self.engine(seed=self.random_seed(), **self.conditional)]
302 else:
303 raise (
304 Exception(
305 f"IndexDistribution: Unhandled case for __getitem__ access. item0: {item0}; conditional: {self.conditional}"
306 )
307 )
309 def __getitem__(self, y):
310 # Prefer discrete list mode if present
311 if hasattr(self, "distributions") and self.distributions:
312 return self.distributions[y]
313 return self.dstns[y]
315 def reset(self):
316 # Reset the main RNG and each member distribution
317 super().reset()
318 for d in self.dstns:
319 d.reset()
321 def discretize(self, N, **kwds):
322 """
323 Approximation of the distribution.
325 Parameters
326 ----------
327 N : init
328 Number of discrete points to approximate
329 continuous distribution into.
331 kwds: dict
332 Other keyword arguments passed to engine
333 distribution approx() method.
335 Returns:
336 ------------
337 dists : [DiscreteDistribution] or IndexDistribution
338 If parameterization is constant, returns a single DiscreteDistribution.
339 If parameterization varies with index, returns an IndexDistribution in
340 discrete-list mode, wrapping the corresponding discrete distributions.
341 """
343 # If already in discrete list mode, return self (already discretized)
344 if hasattr(self, "distributions") and self.distributions:
345 return self
347 # test one item to determine case handling
348 item0 = list(self.conditional.values())[0]
350 if type(item0) is float:
351 # degenerate case. Treat the parameterization as constant.
352 return self.dstns[0].discretize(N, **kwds)
354 if type(item0) is list:
355 # Return an IndexDistribution wrapping a list of discrete distributions
356 return IndexDistribution(
357 distributions=[
358 self[i].discretize(N, **kwds) for i, _ in enumerate(item0)
359 ],
360 seed=self.seed,
361 )
363 def draw(self, condition):
364 """
365 Generate arrays of draws.
366 The input is an array containing the conditions.
367 The output is an array of the same length (axis 1 dimension)
368 as the conditions containing random draws of the conditional
369 distribution.
371 Parameters
372 ----------
373 condition : np.array
374 The input conditions to the distribution.
376 Returns:
377 ------------
378 draws : np.array
379 """
380 # for now, assume that all the conditionals
381 # are of the same type.
382 # this matches the HARK 'time-varying' model architecture.
384 # If wrapping discrete distributions, draw from those
385 if hasattr(self, "distributions") and self.distributions:
386 draws = np.zeros(condition.size)
387 for c in np.unique(condition):
388 these = c == condition
389 N = np.sum(these)
390 draws[these] = self.distributions[c].draw(N)
391 return draws
393 # test one item to determine case handling
394 item0 = list(self.conditional.values())[0]
396 if type(item0) is float:
397 # degenerate case. Treat the parameterization as constant.
398 N = condition.size
400 return self.engine(seed=self.random_seed(), **self.conditional).draw(N)
402 if type(item0) is list:
403 # conditions are indices into list
404 # somewhat convoluted sampling strategy retained
405 # for test backwards compatibility
407 draws = np.zeros(condition.size)
409 for c in np.unique(condition):
410 these = c == condition
411 N = np.sum(these)
413 draws[these] = self[c].draw(N)
415 return draws