Coverage for HARK/distributions/base.py: 89%
108 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 05:14 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 05:14 +0000
1from typing import Any, Optional
3import numpy as np
4from numpy import random
7class Distribution:
8 """
9 Base class for all probability distributions
10 with seed and random number generator.
12 For discussion on random number generation and random seeds, see
13 https://docs.scipy.org/doc/scipy/tutorial/stats.html#random-number-generation
15 Parameters
16 ----------
17 seed : Optional[int]
18 Seed for random number generator.
19 """
21 def __init__(self, seed: Optional[int] = 0) -> None:
22 """
23 Generic distribution class with seed management.
25 Parameters
26 ----------
27 seed : Optional[int], optional
28 Seed for random number generator, by default None
29 generates random seed based on entropy.
31 Raises
32 ------
33 ValueError
34 Seed must be an integer type.
35 """
36 if seed is None:
37 # generate random seed
38 _seed: int = random.SeedSequence().entropy
39 elif isinstance(seed, (int, np.integer)):
40 _seed = seed
41 else:
42 raise ValueError("seed must be an integer")
44 self._seed: int = _seed
45 self._rng: random.Generator = random.default_rng(self._seed)
47 # Bounds of distribution support should be overwritten by subclasses
48 self.infimum = np.array([])
49 self.supremum = np.array([])
51 @property
52 def seed(self) -> int:
53 """
54 Seed for random number generator.
56 Returns
57 -------
58 int
59 Seed.
60 """
61 return self._seed # type: ignore
63 @seed.setter
64 def seed(self, seed: int) -> None:
65 """
66 Set seed for random number generator.
68 Parameters
69 ----------
70 seed : int
71 Seed for random number generator.
72 """
74 if isinstance(seed, (int, np.integer)):
75 self._seed = seed
76 self._rng = random.default_rng(seed)
77 else:
78 raise ValueError("seed must be an integer")
80 def reset(self) -> None:
81 """
82 Reset the random number generator of this distribution.
83 Resetting the seed will result in the same sequence of
84 random numbers being generated.
86 Parameters
87 ----------
88 """
89 self._rng = random.default_rng(self.seed)
91 def random_seed(self) -> None:
92 """
93 Generate a new random seed for this distribution.
94 """
95 self.seed = random.SeedSequence().entropy
97 def draw(self, N: int) -> np.ndarray:
98 """
99 Generate arrays of draws from this distribution.
100 If input N is a number, output is a length N array of draws from the
101 distribution. If N is a list, output is a length T list whose
102 t-th entry is a length N array of draws from the distribution[t].
104 Parameters
105 ----------
106 N : int
107 Number of draws in each row.
109 Returns:
110 ------------
111 draws : np.array or [np.array]
112 T-length list of arrays of random variable draws each of size n, or
113 a single array of size N (if sigma is a scalar).
114 """
116 mean = self.mean() if callable(self.mean) else self.mean
117 size = (N, mean.size) if mean.size != 1 else N
118 return self.rvs(size=size, random_state=self._rng)
120 def discretize(
121 self, N: int, method: str = "equiprobable", endpoints: bool = False, **kwds: Any
122 ) -> "DiscreteDistribution":
123 """
124 Discretize the distribution into N points using the specified method.
126 Parameters
127 ----------
128 N : int
129 Number of points in the discretization.
130 method : str, optional
131 Method for discretization, by default "equiprobable"
132 endpoints : bool, optional
133 Whether to include endpoints in the discretization, by default False
135 Returns
136 -------
137 discretized_dstn : DiscreteDistribution
138 Discretized distribution.
140 Raises
141 ------
142 NotImplementedError
143 If method is not implemented for this distribution.
144 """
146 approx_method = "_approx_" + method
148 if not hasattr(self, approx_method):
149 raise NotImplementedError(
150 "discretize() with method = {} not implemented for {} class".format(
151 method, self.__class__.__name__
152 )
153 )
155 approx = getattr(self, approx_method)
156 discretized_dstn = approx(N, endpoints, **kwds)
157 discretized_dstn.limit["infimum"] = self.infimum.copy()
158 discretized_dstn.limit["supremum"] = self.supremum.copy()
159 return discretized_dstn
162class MarkovProcess(Distribution):
163 """
164 A representation of a discrete Markov process.
166 Parameters
167 ----------
168 transition_matrix : np.array
169 An array of floats representing a probability mass for
170 each state transition.
171 seed : int
172 Seed for random number generator.
174 """
176 transition_matrix = None
178 def __init__(self, transition_matrix, seed=0):
179 """
180 Initialize a discrete distribution.
182 """
183 self.transition_matrix = transition_matrix
185 # Set up the RNG
186 super().__init__(seed)
188 def draw(self, state):
189 """
190 Draw new states fromt the transition matrix.
192 Parameters
193 ----------
194 state : int or nd.array
195 The state or states (1-D array) from which to draw new states.
197 Returns
198 -------
199 new_state : int or nd.array
200 New states.
201 """
203 def sample(s):
204 return self._rng.choice(
205 self.transition_matrix.shape[1], p=self.transition_matrix[s, :]
206 )
208 array_sample = np.frompyfunc(sample, 1, 1)
210 return array_sample(state)
213class IndexDistribution(Distribution):
214 """
215 This class provides a way to define a distribution that
216 is conditional on an index.
218 The current implementation combines a defined distribution
219 class (such as Bernoulli, LogNormal, etc.) with information
220 about the conditions on the parameters of the distribution.
222 It can also wrap a list of pre-discretized distributions (previously
223 provided by TimeVaryingDiscreteDistribution) and provide the same API.
225 Parameters
226 ----------
228 engine : Distribution class
229 A Distribution subclass.
231 conditional: dict
232 Information about the conditional variation
233 on the input parameters of the engine distribution.
234 Keys should match the arguments to the engine class
235 constructor.
237 distributions: [DiscreteDistribution]
238 Optional. A list of discrete distributions to wrap directly.
240 seed : int
241 Seed for random number generator.
242 """
244 conditional = None
245 engine = None
247 def __init__(
248 self, engine=None, conditional=None, distributions=None, RNG=None, seed=0
249 ):
250 if RNG is None:
251 # Set up the RNG
252 super().__init__(seed)
253 else:
254 # If an RNG is received, use it in whatever state it is in.
255 self._rng = RNG
256 # The seed will still be set, even if it is not used for the RNG,
257 # for whenever self.reset() is called.
258 # Note that self.reset() will stop using the RNG that was passed
259 # and create a new one.
260 self.seed = seed
262 # Mode 1: wrapping a list of discrete distributions
263 if distributions is not None:
264 self.distributions = distributions
265 self.engine = None
266 self.conditional = None
267 self.dstns = []
268 return
270 # Mode 2: engine + conditional parameters (original IndexDistribution)
271 self.conditional = conditional if conditional is not None else {}
272 self.engine = engine
274 self.dstns = []
276 # If no engine/conditional were provided, this is an invalid state.
277 if self.engine is None and not self.conditional:
278 raise ValueError(
279 "MarkovProcess: No engine or conditional parameters provided; this should not happen in normal use."
280 )
282 # Test one item to determine case handling
283 item0 = list(self.conditional.values())[0]
285 if type(item0) is list:
286 # Create and store all the conditional distributions
287 for y in range(len(item0)):
288 cond = {key: val[y] for (key, val) in self.conditional.items()}
289 self.dstns.append(
290 self.engine(seed=self._rng.integers(0, 2**31 - 1), **cond)
291 )
293 elif type(item0) is float:
294 self.dstns = [
295 self.engine(seed=self._rng.integers(0, 2**31 - 1), **self.conditional)
296 ]
298 else:
299 raise (
300 Exception(
301 f"IndexDistribution: Unhandled case for __getitem__ access. item0: {item0}; conditional: {self.conditional}"
302 )
303 )
305 def __getitem__(self, y):
306 # Prefer discrete list mode if present
307 if hasattr(self, "distributions") and self.distributions:
308 return self.distributions[y]
309 return self.dstns[y]
311 def discretize(self, N, **kwds):
312 """
313 Approximation of the distribution.
315 Parameters
316 ----------
317 N : init
318 Number of discrete points to approximate
319 continuous distribution into.
321 kwds: dict
322 Other keyword arguments passed to engine
323 distribution approx() method.
325 Returns:
326 ------------
327 dists : [DiscreteDistribution] or IndexDistribution
328 If parameterization is constant, returns a single DiscreteDistribution.
329 If parameterization varies with index, returns an IndexDistribution in
330 discrete-list mode, wrapping the corresponding discrete distributions.
331 """
333 # If already in discrete list mode, return self (already discretized)
334 if hasattr(self, "distributions") and self.distributions:
335 return self
337 # test one item to determine case handling
338 item0 = list(self.conditional.values())[0]
340 if type(item0) is float:
341 # degenerate case. Treat the parameterization as constant.
342 return self.dstns[0].discretize(N, **kwds)
344 if type(item0) is list:
345 # Return an IndexDistribution wrapping a list of discrete distributions
346 return IndexDistribution(
347 distributions=[
348 self[i].discretize(N, **kwds) for i, _ in enumerate(item0)
349 ],
350 seed=self.seed,
351 )
353 def draw(self, condition):
354 """
355 Generate arrays of draws.
356 The input is an array containing the conditions.
357 The output is an array of the same length (axis 1 dimension)
358 as the conditions containing random draws of the conditional
359 distribution.
361 Parameters
362 ----------
363 condition : np.array
364 The input conditions to the distribution.
366 Returns:
367 ------------
368 draws : np.array
369 """
370 # for now, assume that all the conditionals
371 # are of the same type.
372 # this matches the HARK 'time-varying' model architecture.
374 # If wrapping discrete distributions, draw from those
375 if hasattr(self, "distributions") and self.distributions:
376 draws = np.zeros(condition.size)
377 for c in np.unique(condition):
378 these = c == condition
379 N = np.sum(these)
380 draws[these] = self.distributions[c].draw(N)
381 return draws
383 # test one item to determine case handling
384 item0 = list(self.conditional.values())[0]
386 if type(item0) is float:
387 # degenerate case. Treat the parameterization as constant.
388 N = condition.size
390 return self.engine(
391 seed=self._rng.integers(0, 2**31 - 1), **self.conditional
392 ).draw(N)
394 if type(item0) is list:
395 # conditions are indices into list
396 # somewhat convoluted sampling strategy retained
397 # for test backwards compatibility
399 draws = np.zeros(condition.size)
401 for c in np.unique(condition):
402 these = c == condition
403 N = np.sum(these)
405 draws[these] = self[c].draw(N)
407 return draws