Coverage for HARK / distributions / base.py: 89%
110 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-07 05:16 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-07 05:16 +0000
1from typing import Any, Optional
3import numpy as np
4from numpy import random
7class Distribution:
8 """
9 Base class for all probability distributions
10 with seed and random number generator.
12 For discussion on random number generation and random seeds, see
13 https://docs.scipy.org/doc/scipy/tutorial/stats.html#random-number-generation
15 Parameters
16 ----------
17 seed : Optional[int]
18 Seed for random number generator.
19 """
21 def __init__(self, seed: Optional[int] = 0) -> None:
22 """
23 Generic distribution class with seed management.
25 Parameters
26 ----------
27 seed : Optional[int], optional
28 Seed for random number generator, by default None
29 generates random seed based on entropy.
31 Raises
32 ------
33 ValueError
34 Seed must be an integer type.
35 """
36 if seed is None:
37 # generate random seed
38 _seed: int = random.SeedSequence().entropy
39 elif isinstance(seed, (int, np.integer)):
40 _seed = seed
41 else:
42 raise ValueError("seed must be an integer")
44 self._seed: int = _seed
45 self._rng: random.Generator = random.default_rng(self._seed)
47 # Bounds of distribution support should be overwritten by subclasses
48 self.infimum = np.array([])
49 self.supremum = np.array([])
51 @property
52 def seed(self) -> int:
53 """
54 Seed for random number generator.
56 Returns
57 -------
58 int
59 Seed.
60 """
61 return self._seed # type: ignore
63 @seed.setter
64 def seed(self, seed: int) -> None:
65 """
66 Set seed for random number generator.
68 Parameters
69 ----------
70 seed : int
71 Seed for random number generator.
72 """
74 if isinstance(seed, (int, np.integer)):
75 self._seed = seed
76 self._rng = random.default_rng(seed)
77 else:
78 raise ValueError("seed must be an integer")
80 def reset(self) -> None:
81 """
82 Reset the random number generator of this distribution.
83 Resetting the seed will result in the same sequence of
84 random numbers being generated.
86 Parameters
87 ----------
88 """
89 self._rng = random.default_rng(self.seed)
91 def random_seed(self) -> None:
92 """
93 Generate a new random seed for this distribution.
94 """
95 self.seed = random.SeedSequence().entropy
97 def draw(self, N: int) -> np.ndarray:
98 """
99 Generate arrays of draws from this distribution.
100 If input N is a number, output is a length N array of draws from the
101 distribution. If N is a list, output is a length T list whose
102 t-th entry is a length N array of draws from the distribution[t].
104 Parameters
105 ----------
106 N : int
107 Number of draws in each row.
109 Returns:
110 ------------
111 draws : np.array or [np.array]
112 T-length list of arrays of random variable draws each of size n, or
113 a single array of size N (if sigma is a scalar).
114 """
115 return self.rvs(size=N, random_state=self._rng).T
117 def discretize(
118 self, N: int, method: str = "equiprobable", endpoints: bool = False, **kwds: Any
119 ) -> "DiscreteDistribution":
120 """
121 Discretize the distribution into N points using the specified method.
123 Parameters
124 ----------
125 N : int
126 Number of points in the discretization.
127 method : str, optional
128 Method for discretization, by default "equiprobable"
129 endpoints : bool, optional
130 Whether to include endpoints in the discretization, by default False
132 Returns
133 -------
134 discretized_dstn : DiscreteDistribution
135 Discretized distribution.
137 Raises
138 ------
139 NotImplementedError
140 If method is not implemented for this distribution.
141 """
143 approx_method = "_approx_" + method
145 if not hasattr(self, approx_method):
146 raise NotImplementedError(
147 "discretize() with method = {} not implemented for {} class".format(
148 method, self.__class__.__name__
149 )
150 )
152 approx = getattr(self, approx_method)
153 discretized_dstn = approx(N, endpoints, **kwds)
154 discretized_dstn.limit["infimum"] = self.infimum.copy()
155 discretized_dstn.limit["supremum"] = self.supremum.copy()
156 return discretized_dstn
159class MarkovProcess(Distribution):
160 """
161 A representation of a discrete Markov process.
163 Parameters
164 ----------
165 transition_matrix : np.array
166 An array of floats representing a probability mass for
167 each state transition.
168 seed : int
169 Seed for random number generator.
171 """
173 transition_matrix = None
175 def __init__(self, transition_matrix, seed=0):
176 """
177 Initialize a discrete distribution.
179 """
180 self.transition_matrix = transition_matrix
182 # Set up the RNG
183 super().__init__(seed)
185 def draw(self, state):
186 """
187 Draw new states fromt the transition matrix.
189 Parameters
190 ----------
191 state : int or nd.array
192 The state or states (1-D array) from which to draw new states.
194 Returns
195 -------
196 new_state : int or nd.array
197 New states.
198 """
200 def sample(s):
201 return self._rng.choice(
202 self.transition_matrix.shape[1], p=self.transition_matrix[s, :]
203 )
205 array_sample = np.frompyfunc(sample, 1, 1)
207 return array_sample(state)
210class IndexDistribution(Distribution):
211 """
212 This class provides a way to define a distribution that
213 is conditional on an index.
215 The current implementation combines a defined distribution
216 class (such as Bernoulli, LogNormal, etc.) with information
217 about the conditions on the parameters of the distribution.
219 It can also wrap a list of pre-discretized distributions (previously
220 provided by TimeVaryingDiscreteDistribution) and provide the same API.
222 Parameters
223 ----------
225 engine : Distribution class
226 A Distribution subclass.
228 conditional: dict
229 Information about the conditional variation on the input parameters of the engine
230 distribution. Keys should match the arguments to the engine class constructor.
232 distributions: [DiscreteDistribution]
233 Optional. A list of discrete distributions to wrap directly.
235 seed : int
236 Seed for random number generator.
237 """
239 conditional = None
240 engine = None
242 def __init__(
243 self, engine=None, conditional=None, distributions=None, RNG=None, seed=0
244 ):
245 if RNG is None:
246 # Set up the RNG
247 super().__init__(seed)
248 else:
249 # If an RNG is received, use it in whatever state it is in.
250 self._rng = RNG
251 # The seed will still be set, even if it is not used for the RNG,
252 # for whenever self.reset() is called.
253 # Note that self.reset() will stop using the RNG that was passed
254 # and create a new one.
255 self.seed = seed
257 # Mode 1: wrapping a list of discrete distributions
258 if distributions is not None:
259 self.distributions = distributions
260 self.engine = None
261 self.conditional = None
262 self.dstns = []
263 return
265 # Mode 2: engine + conditional parameters (original IndexDistribution)
266 self.conditional = conditional if conditional is not None else {}
267 self.engine = engine
269 self.dstns = []
271 # If no engine/conditional were provided, this is an invalid state.
272 if self.engine is None and not self.conditional:
273 raise ValueError(
274 "MarkovProcess: No engine or conditional parameters provided; this should not happen in normal use."
275 )
277 # Test one item to determine case handling
278 item0 = list(self.conditional.values())[0]
280 if type(item0) is list:
281 # Create and store all the conditional distributions
282 for y in range(len(item0)):
283 cond = {key: val[y] for (key, val) in self.conditional.items()}
284 self.dstns.append(
285 self.engine(seed=self._rng.integers(0, 2**31 - 1), **cond)
286 )
288 elif type(item0) is float:
289 self.dstns = [
290 self.engine(seed=self._rng.integers(0, 2**31 - 1), **self.conditional)
291 ]
293 else:
294 raise (
295 Exception(
296 f"IndexDistribution: Unhandled case for __getitem__ access. item0: {item0}; conditional: {self.conditional}"
297 )
298 )
300 def __getitem__(self, y):
301 # Prefer discrete list mode if present
302 if hasattr(self, "distributions") and self.distributions:
303 return self.distributions[y]
304 return self.dstns[y]
306 def reset(self):
307 # Reset the main RNG and each member distribution
308 super().reset()
309 for d in self.dstns:
310 d.reset()
312 def discretize(self, N, **kwds):
313 """
314 Approximation of the distribution.
316 Parameters
317 ----------
318 N : init
319 Number of discrete points to approximate
320 continuous distribution into.
322 kwds: dict
323 Other keyword arguments passed to engine
324 distribution approx() method.
326 Returns:
327 ------------
328 dists : [DiscreteDistribution] or IndexDistribution
329 If parameterization is constant, returns a single DiscreteDistribution.
330 If parameterization varies with index, returns an IndexDistribution in
331 discrete-list mode, wrapping the corresponding discrete distributions.
332 """
334 # If already in discrete list mode, return self (already discretized)
335 if hasattr(self, "distributions") and self.distributions:
336 return self
338 # test one item to determine case handling
339 item0 = list(self.conditional.values())[0]
341 if type(item0) is float:
342 # degenerate case. Treat the parameterization as constant.
343 return self.dstns[0].discretize(N, **kwds)
345 if type(item0) is list:
346 # Return an IndexDistribution wrapping a list of discrete distributions
347 return IndexDistribution(
348 distributions=[
349 self[i].discretize(N, **kwds) for i, _ in enumerate(item0)
350 ],
351 seed=self.seed,
352 )
354 def draw(self, condition):
355 """
356 Generate arrays of draws.
357 The input is an array containing the conditions.
358 The output is an array of the same length (axis 1 dimension)
359 as the conditions containing random draws of the conditional
360 distribution.
362 Parameters
363 ----------
364 condition : np.array
365 The input conditions to the distribution.
367 Returns:
368 ------------
369 draws : np.array
370 """
371 # for now, assume that all the conditionals
372 # are of the same type.
373 # this matches the HARK 'time-varying' model architecture.
375 # If wrapping discrete distributions, draw from those
376 if hasattr(self, "distributions") and self.distributions:
377 draws = np.zeros(condition.size)
378 for c in np.unique(condition):
379 these = c == condition
380 N = np.sum(these)
381 draws[these] = self.distributions[c].draw(N)
382 return draws
384 # test one item to determine case handling
385 item0 = list(self.conditional.values())[0]
387 if type(item0) is float:
388 # degenerate case. Treat the parameterization as constant.
389 N = condition.size
391 return self.engine(
392 seed=self._rng.integers(0, 2**31 - 1), **self.conditional
393 ).draw(N)
395 if type(item0) is list:
396 # conditions are indices into list
397 # somewhat convoluted sampling strategy retained
398 # for test backwards compatibility
400 draws = np.zeros(condition.size)
402 for c in np.unique(condition):
403 these = c == condition
404 N = np.sum(these)
406 draws[these] = self[c].draw(N)
408 return draws