Coverage for HARK / distributions / utils.py: 95%
175 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-10 06:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-10 06:19 +0000
1import math
2from warnings import warn
4import numpy as np
5from scipy import stats
7from HARK.distributions.base import IndexDistribution
8from HARK.distributions.discrete import (
9 DiscreteDistribution,
10 DiscreteDistributionLabeled,
11)
12from HARK.distributions.continuous import Normal
15# TODO: This function does not generate the limit attribute
16def approx_lognormal_gauss_hermite(N, mu=0.0, sigma=1.0, seed=None):
17 d = Normal(mu, sigma).discretize(N, method="hermite")
18 return DiscreteDistribution(d.pmv, np.exp(d.atoms), seed=seed)
21def calc_normal_style_pars_from_lognormal_pars(avg_lognormal, std_lognormal):
22 varLognormal = std_lognormal**2
23 varNormal = math.log(1 + varLognormal / avg_lognormal**2)
24 avgNormal = math.log(avg_lognormal) - varNormal * 0.5
25 std_normal = math.sqrt(varNormal)
26 return avgNormal, std_normal
29def calc_lognormal_style_pars_from_normal_pars(mu_normal, std_normal):
30 varNormal = std_normal**2
31 avg_lognormal = math.exp(mu_normal + varNormal * 0.5)
32 varLognormal = (math.exp(varNormal) - 1) * avg_lognormal**2
33 std_lognormal = math.sqrt(varLognormal)
34 return avg_lognormal, std_lognormal
37def approx_beta(N, a=1.0, b=1.0):
38 """
39 Calculate a discrete approximation to the beta distribution. May be quite
40 slow, as it uses a rudimentary numeric integration method to generate the
41 discrete approximation.
43 Parameters
44 ----------
45 N : int
46 Size of discrete space vector to be returned.
47 a : float
48 First shape parameter (sometimes called alpha).
49 b : float
50 Second shape parameter (sometimes called beta).
52 Returns
53 -------
54 d : DiscreteDistribution
55 Probability associated with each point in array of discrete
56 points for discrete probability mass function.
57 """
58 P = 1000
59 vals = np.reshape(stats.beta.ppf(np.linspace(0.0, 1.0, N * P), a, b), (N, P))
60 atoms = np.mean(vals, axis=1)
61 pmv = np.ones(N) / float(N)
62 return DiscreteDistribution(pmv, atoms)
65def make_markov_approx_to_normal(x_grid, mu, sigma, K=351, bound=3.5):
66 """
67 Creates an approximation to a normal distribution with mean mu and standard
68 deviation sigma, returning a stochastic vector called p_vec, corresponding
69 to values in x_grid. If a RV is distributed x~N(mu,sigma), then the expectation
70 of a continuous function f() is E[f(x)] = numpy.dot(p_vec,f(x_grid)).
72 Parameters
73 ----------
74 x_grid: numpy.array
75 A sorted 1D array of floats representing discrete values that a normally
76 distributed RV could take on.
77 mu: float
78 Mean of the normal distribution to be approximated.
79 sigma: float
80 Standard deviation of the normal distribution to be approximated.
81 K: int
82 Number of points in the normal distribution to sample.
83 bound: float
84 Truncation bound of the normal distribution, as +/- bound*sigma.
86 Returns
87 -------
88 p_vec: numpy.array
89 A stochastic vector with probability weights for each x in x_grid.
90 """
91 x_n = x_grid.size # Number of points in the outcome grid
92 lower_bound = -bound # Lower bound of normal draws to consider, in SD
93 upper_bound = bound # Upper bound of normal draws to consider, in SD
94 raw_sample = np.linspace(
95 lower_bound, upper_bound, K
96 ) # Evenly spaced draws between bounds
97 f_weights = stats.norm.pdf(raw_sample) # Relative probability of each draw
98 sample = mu + sigma * raw_sample # Adjusted bounds, given mean and stdev
99 w_vec = np.zeros(x_n) # A vector of outcome weights
101 # Find the relative position of each of the draws
102 sample_pos = np.searchsorted(x_grid, sample)
103 sample_pos[sample_pos < 1] = 1
104 sample_pos[sample_pos > x_n - 1] = x_n - 1
106 # Make arrays of the x_grid point directly above and below each draw
107 bot = x_grid[sample_pos - 1]
108 top = x_grid[sample_pos]
109 alpha = (sample - bot) / (top - bot)
111 # Keep the weights (alpha) in bounds
112 alpha_clipped = np.clip(alpha, 0.0, 1.0)
114 # Loop through each x_grid point and add up the probability that each nearby
115 # draw contributes to it (accounting for distance)
116 for j in range(1, x_n):
117 c = sample_pos == j
118 w_vec[j - 1] = w_vec[j - 1] + np.dot(f_weights[c], 1.0 - alpha_clipped[c])
119 w_vec[j] = w_vec[j] + np.dot(f_weights[c], alpha_clipped[c])
121 # Reweight the probabilities so they sum to 1
122 W = np.sum(w_vec)
123 p_vec = w_vec / W
125 # Check for obvious errors, and return p_vec
126 assert (
127 (np.all(p_vec >= 0.0))
128 and (np.all(p_vec <= 1.0))
129 and (np.isclose(np.sum(p_vec), 1.0))
130 )
131 return p_vec
134def make_markov_approx_to_normal_by_monte_carlo(x_grid, mu, sigma, N_draws=10000):
135 """
136 Creates an approximation to a normal distribution with mean mu and standard
137 deviation sigma, by Monte Carlo.
138 Returns a stochastic vector called p_vec, corresponding
139 to values in x_grid. If a RV is distributed x~N(mu,sigma), then the expectation
140 of a continuous function f() is E[f(x)] = numpy.dot(p_vec,f(x_grid)).
142 Parameters
143 ----------
144 x_grid: numpy.array
145 A sorted 1D array of floats representing discrete values that a normally
146 distributed RV could take on.
147 mu: float
148 Mean of the normal distribution to be approximated.
149 sigma: float
150 Standard deviation of the normal distribution to be approximated.
151 N_draws: int
152 Number of draws to use in Monte Carlo.
154 Returns
155 -------
156 p_vec: numpy.array
157 A stochastic vector with probability weights for each x in x_grid.
158 """
160 # Take random draws from the desired normal distribution
161 random_draws = np.random.normal(loc=mu, scale=sigma, size=N_draws)
163 # Compute the distance between the draws and points in x_grid
164 distance = np.abs(x_grid[:, np.newaxis] - random_draws[np.newaxis, :])
166 # Find the indices of the points in x_grid that are closest to the draws
167 distance_minimizing_index = np.argmin(distance, axis=0)
169 # For each point in x_grid, the approximate probability of that point is the number
170 # of Monte Carlo draws that are closest to that point
171 p_vec = np.zeros_like(x_grid)
172 for p_index, p in enumerate(p_vec):
173 p_vec[p_index] = np.sum(distance_minimizing_index == p_index) / N_draws
175 # Check for obvious errors, and return p_vec
176 assert (
177 (np.all(p_vec >= 0.0))
178 and (np.all(p_vec <= 1.0))
179 and (np.isclose(np.sum(p_vec), 1.0))
180 )
181 return p_vec
184def make_tauchen_ar1(N, sigma=1.0, ar_1=0.9, bound=3.0, inflendpoint=True):
185 """
186 Function to return a discretized version of an AR1 process.
187 See http://www.fperri.net/TEACHING/macrotheory08/numerical.pdf for details
189 Parameters
190 ----------
191 N: int
192 Size of discretized grid
193 sigma: float
194 Standard deviation of the error term
195 ar_1: float
196 AR1 coefficient
197 bound: float
198 The highest (lowest) grid point will be bound (-bound) multiplied by the unconditional
199 standard deviation of the process
200 inflendpoint: Bool
201 If True: implement the standard method as in Tauchen (1986):
202 assign the probability of jumping to a point outside the grid to the closest endpoint
203 If False: implement an alternative method:
204 discard the probability of jumping to a point outside the grid, effectively
205 reassigning it to the remaining points in proportion to their probability of being reached
207 Returns
208 -------
209 y: np.array
210 Grid points on which the discretized process takes values
211 trans_matrix: np.array
212 Markov transition array for the discretized process
213 """
214 yN = bound * sigma / ((1 - ar_1**2) ** 0.5)
215 y = np.linspace(-yN, yN, N)
216 d = y[1] - y[0]
217 cuts = (y[1:] + y[:-1]) / 2.0
218 if inflendpoint:
219 cuts = np.concatenate(([-np.inf], cuts, [np.inf]))
220 else:
221 cuts = np.concatenate(([y[0] - d / 2], cuts, [y[-1] + d / 2]))
222 dist = np.reshape(cuts, (1, N + 1)) - np.reshape(ar_1 * y, (N, 1))
223 dist /= sigma
224 cdf_array = stats.norm.cdf(dist)
225 sf_array = stats.norm.sf(dist)
226 trans = cdf_array[:, 1:] - cdf_array[:, :-1]
227 trans_alt = sf_array[:, :-1] - sf_array[:, 1:]
228 trans_matrix = np.maximum(trans, trans_alt)
229 trans_matrix /= np.sum(trans_matrix, axis=1, keepdims=True)
230 return y, trans_matrix
233# ================================================================================
234# ==================== Functions for manipulating discrete distributions =========
235# ================================================================================
238def _bounds_with_added_atom(distribution, x, atoms):
239 """
240 Compute (infimum, supremum) for a distribution after a new atom ``x`` is added.
242 Falls back to taking min/max over the augmented atoms array when the source
243 distribution does not expose ``infimum``/``supremum`` in its ``limit`` dict.
244 """
245 temp_x = np.array(x, ndmin=1)
246 try:
247 infimum = np.array(
248 [
249 np.minimum(temp_x[i], distribution.limit["infimum"][i])
250 for i in range(temp_x.size)
251 ]
252 )
253 except KeyError:
254 infimum = np.min(atoms, axis=-1, keepdims=True)
255 try:
256 supremum = np.array(
257 [
258 np.maximum(temp_x[i], distribution.limit["supremum"][i])
259 for i in range(temp_x.size)
260 ]
261 )
262 except KeyError:
263 supremum = np.max(atoms, axis=-1, keepdims=True)
264 return infimum, supremum
267def _finalize_with_added_atom(distribution, atoms, x, p, method, sort):
268 """Build the new ``DiscreteDistribution`` after a value ``x`` (probability ``p``)
269 has been spliced onto ``distribution``. Shared by ``add_discrete_outcome`` and
270 ``add_discrete_outcome_constant_mean``.
271 """
272 pmv = np.append(p, distribution.pmv * (1 - p))
273 if sort:
274 indices = np.argsort(atoms)
275 atoms = atoms[indices]
276 pmv = pmv[indices]
277 infimum, supremum = _bounds_with_added_atom(distribution, x, atoms)
278 limit = {
279 "dist": distribution,
280 "method": method,
281 "x": x,
282 "p": p,
283 "infimum": infimum,
284 "supremum": supremum,
285 }
286 return DiscreteDistribution(pmv, atoms, seed=distribution.seed, limit=limit)
289def add_discrete_outcome_constant_mean(distribution, x, p, sort=False):
290 """
291 Adds a discrete outcome of x with probability p to an existing distribution,
292 holding constant the relative probabilities of other outcomes and overall mean.
294 Parameters
295 ----------
296 distribution : DiscreteDistribution
297 A one-dimensional DiscreteDistribution.
298 x : float
299 The new value to be added to the distribution.
300 p : float
301 The probability of the discrete outcome x occuring.
302 sort: bool
303 Whether or not to sort atoms before returning it
305 Returns
306 -------
307 d : DiscreteDistribution
308 Probability associated with each point in array of discrete
309 points for discrete probability mass function.
310 """
311 if (
312 isinstance(distribution, IndexDistribution)
313 and hasattr(distribution, "distributions")
314 and distribution.distributions
315 ):
316 # apply recursively on all the internal distributions
317 return IndexDistribution(
318 distributions=[
319 add_discrete_outcome_constant_mean(d, x, p, sort=sort)
320 for d in distribution.distributions
321 ],
322 seed=distribution.seed,
323 )
325 atoms = np.append(x, distribution.atoms * (1 - p * x) / (1 - p))
326 return _finalize_with_added_atom(
327 distribution, atoms, x, p, "add_discrete_outcome_constant_mean", sort
328 )
331def add_discrete_outcome(distribution, x, p, sort=False):
332 """
333 Adds a discrete outcome of x with probability p to an existing distribution,
334 holding constant the relative probabilities of other outcomes.
336 Parameters
337 ----------
338 distribution : DiscreteDistribution
339 One-dimensional distribution to which the outcome is to be added.
340 x : float
341 The new value to be added to the distribution.
342 p : float
343 The probability of the discrete outcome x occuring.
345 Returns
346 -------
347 d : DiscreteDistribution
348 Probability associated with each point in array of discrete
349 points for discrete probability mass function.
350 """
352 atoms = np.append(x, distribution.atoms)
353 return _finalize_with_added_atom(
354 distribution, atoms, x, p, "add_discrete_outcome", sort
355 )
358def combine_indep_dstns(*distributions, seed=None):
359 """
360 Given n independent vector-valued discrete distributions, construct their joint discrete distribution.
361 Can take multivariate discrete distributions as inputs.
363 Parameters
364 ----------
365 distributions : DiscreteDistribution
366 Arbitrary number of discrete distributions to combine. Their realizations must be
367 vector-valued (for each D in distributions, it must be the case that len(D.dim())==1).
368 seed : int, optional
369 Value to use as the RNG seed for the combined distribution, default is 0.
371 Returns
372 -------
373 A DiscreteDistribution representing the joint distribution of the given
374 random variables.
375 """
376 # Get information on the distributions
377 dist_lengths = ()
378 dist_dims = ()
379 dist_is_labeled = ()
380 var_labels = ()
381 for dist in distributions:
382 if len(dist.dim()) > 1:
383 raise NotImplementedError(
384 "We currently only support combining vector-valued distributions."
385 )
387 dist_dims += (dist.dim(),)
388 dist_lengths += (len(dist.pmv),)
390 labeled = isinstance(dist, DiscreteDistributionLabeled)
391 dist_is_labeled += (labeled,)
392 if labeled:
393 var_labels += tuple(dist.dataset.data_vars.keys())
394 else:
395 var_labels += tuple([""] * dist.dim()[0])
397 dstn_count = len(distributions)
399 all_labeled = all(dist_is_labeled)
400 labels_are_unique = len(var_labels) == len(set(var_labels))
402 # We need the combinations of indices of realizations in all
403 # distributions
404 inds = np.meshgrid(
405 *[np.array(range(l), dtype=int) for l in dist_lengths], indexing="ij"
406 )
407 inds = [x.flatten() for x in inds]
409 atoms_out = []
410 P_temp = []
411 for i, ind_vec in enumerate(inds):
412 atoms_out += [distributions[i].atoms[..., ind_vec]]
413 P_temp += [distributions[i].pmv[ind_vec]]
415 atoms_out = np.concatenate(atoms_out, axis=0)
416 P_temp = np.stack(P_temp, axis=0)
417 P_out = np.prod(P_temp, axis=0)
419 assert np.isclose(np.sum(P_out), 1), "Probabilities do not sum to 1!"
421 # Make the limit dictionary
422 infimum = np.concatenate(
423 [distributions[i].limit["infimum"] for i in range(dstn_count)]
424 )
425 supremum = np.concatenate(
426 [distributions[i].limit["supremum"] for i in range(dstn_count)]
427 )
428 limit = {
429 "dist": distributions,
430 "method": "combine_indep_dstns",
431 "infimum": infimum,
432 "supremum": supremum,
433 }
435 if all_labeled and labels_are_unique:
436 combined_dstn = DiscreteDistributionLabeled(
437 pmv=P_out,
438 atoms=atoms_out,
439 var_names=var_labels,
440 limit=limit,
441 seed=seed,
442 )
443 else:
444 if all_labeled and not labels_are_unique:
445 warn(
446 "There are duplicated labels in the provided distributions. Returning a non-labeled combination"
447 )
448 combined_dstn = DiscreteDistribution(P_out, atoms_out, limit=limit, seed=seed)
450 return combined_dstn
453def expected_with_loop(dstn, func=None, *args, **kwargs):
454 """
455 Expectation of a function, given an array of configurations of its inputs
456 along with a DiscreteDistribution object that specifies the probability
457 of each configuration. Computation is performed by looping over each atom
458 of the distribution and evaluating function one at a time. This approach is
459 broadly compatible with any function, but is slow because of the loop.
461 If func is relatively simple, and particularly if it does not involve array
462 operations like tiling, reshaping, or logical indexing, consider using expected
463 instead of expected_with_loop. That function evaluates all atoms simultaneously,
464 avoiding the costly loop, but with reduced compatibility with "complex" operations.
466 Parameters
467 ----------
468 dstn : DiscreteDistribution or DiscreteDistributionLabeled
469 The distribution over which the function is to be evaluated.
470 func : function or None
471 The function to be evaluated. If dstn is a DiscreteDistribution, then this
472 function should take an array of shape dstn.dim() and return either arrays
473 of arbitrary shape or scalars. If dstn is a DiscreteDistributionLabeled, then
474 the function should take an dictionary-like object (like an xr.dataset) and
475 index into it with variable names. In either case, the function may also
476 take other arguments \\*args. Defaults to identity function.
477 \\*args :
478 Other inputs for func, representing the non-stochastic arguments.
479 The expectation is computed at ``f(dstn, *args, **kwargs)``.
480 \\*kwargs :
481 Other keyword inputs for func, representing the non-stochastic arguments.
482 The expectation is computed at ``f(dstn, *args, **kwargs)``.
484 Returns
485 -------
486 f_exp : np.array or scalar
487 The expectation of the function at the queried values.
488 Scalar if only one value.
489 """
490 func = func or (lambda x: x)
492 if hasattr(dstn, "dataset"): # cheap test for DiscreteDistributionLabeled
493 f_query = []
494 for i in range(len(dstn.pmv)):
495 temp_dict = {
496 key: float(dstn.variables[key][i]) for key in dstn.variables.keys()
497 }
498 f_query.append(func(temp_dict, *args, **kwargs))
499 else:
500 f_query = [
501 func(dstn.atoms[..., i], *args, **kwargs) for i in range(len(dstn.pmv))
502 ]
504 f_query = np.stack(f_query, axis=-1)
506 # From the numpy.dot documentation:
507 # If both a and b are 1-D arrays, it is inner product of vectors (without complex conjugation).
508 # If a is an N-D array and b is a 1-D array, it is a sum product over the last axis of a and b.
509 # Thus, if func returns scalars, f_exp will be a scalar and if it returns arrays f_exp
510 # will be an array of the same shape.
511 f_exp = np.dot(f_query, dstn.pmv)
513 return f_exp
516def distr_of_function(dstn, func=lambda x: x, *args):
517 """
518 Finds the distribution of a random variable Y that is a function
519 of discrete random variable atoms, Y=f(atoms).
521 Parameters
522 ----------
523 dstn : DiscreteDistribution
524 The distribution over which the function is to be evaluated.
525 func : function
526 The function to be evaluated.
527 This function should take an array of shape dstn.dim().
528 It may also take other arguments \\*args.
529 \\*args :
530 Additional non-stochastic arguments for func,
531 The function is computed at ``f(dstn, *args)``.
533 Returns
534 -------
535 f_dstn : DiscreteDistribution
536 The distribution of func(dstn).
537 """
538 # Apply function to every event realization
539 f_query = [func(dstn.atoms[..., i], *args) for i in range(len(dstn.pmv))]
541 # Stack results along their last (new) axis
542 f_query = np.stack(f_query, axis=-1)
544 f_dstn = DiscreteDistribution(dstn.pmv, f_query)
546 return f_dstn
549def expected(func=None, dstn=None, args=(), vectorized=True, **kwargs):
550 """
551 Compute the expectation of a function, given an array of configurations of its
552 inputs along with a DiscreteDistribution object that specifies the probability
553 of each configuration.
555 If the function you want to evaluate uses complex array operations, such as
556 tiling or logical indexing, pass `vectorized=False`. In that case, expectations
557 are calculated by looping over each atom in the distribution. Otherwise, this
558 function uses array operations and tries to compute all atoms simultaneously.
560 Parameters
561 ----------
562 func : function
563 The function to be evaluated. This function should take the full array of
564 distribution values and return either arrays of arbitrary shape or scalars.
565 It may also take other arguments ``*args``.
566 dstn : DiscreteDistribution or DiscreteDistributionLabeled
567 The distribution over which the function is to be evaluated.
568 args : tuple
569 Other inputs for func, representing the non-stochastic arguments.
570 The expectation is computed at ``f(dstn, *args)``.
571 vectorized : bool, optional
572 Whether func is vectorizable (True, default) or requires looped evaluation.
573 labels : bool, optional
574 For ``DiscreteDistributionLabeled`` only. If True (default), func
575 receives a dict of labeled arrays. If False, func receives the raw
576 numpy atoms array, making DDL compatible with functions written for
577 ``DiscreteDistribution``.
578 **kwargs :
579 Additional keyword arguments forwarded to ``dist.expected()``.
580 For ``DiscreteDistributionLabeled``, these are passed through to
581 the user-supplied *func* along with the xarray Dataset.
583 Returns
584 -------
585 f_exp : np.array or scalar
586 The expectation of the function at the queried values.
587 Scalar if only one value.
588 """
589 if not isinstance(dstn, DiscreteDistribution):
590 raise TypeError(
591 f"expected(): 'dstn' must be a DiscreteDistribution or "
592 f"DiscreteDistributionLabeled, got {type(dstn).__name__}."
593 )
595 if not isinstance(args, tuple):
596 args = (args,)
598 if not vectorized:
599 loop_kwargs = kwargs.copy()
600 loop_kwargs.pop("labels", None)
601 return expected_with_loop(dstn, func, *args, **loop_kwargs)
603 if args:
604 if kwargs:
605 return dstn.expected(func, *args, **kwargs)
606 return dstn.expected(func, *args)
608 if kwargs:
609 return dstn.expected(func, **kwargs)
610 return dstn.expected(func)