Coverage for HARK / distributions / utils.py: 92%
176 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 05:31 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 05:31 +0000
1import math
2from warnings import warn
4import numpy as np
5from scipy import stats
7from HARK.distributions.base import IndexDistribution
8from HARK.distributions.discrete import (
9 DiscreteDistribution,
10 DiscreteDistributionLabeled,
11)
12from HARK.distributions.continuous import Normal
15# TODO: This function does not generate the limit attribute
16def approx_lognormal_gauss_hermite(N, mu=0.0, sigma=1.0, seed=None):
17 d = Normal(mu, sigma).discretize(N, method="hermite")
18 return DiscreteDistribution(d.pmv, np.exp(d.atoms), seed=seed)
21def calc_normal_style_pars_from_lognormal_pars(avg_lognormal, std_lognormal):
22 varLognormal = std_lognormal**2
23 varNormal = math.log(1 + varLognormal / avg_lognormal**2)
24 avgNormal = math.log(avg_lognormal) - varNormal * 0.5
25 std_normal = math.sqrt(varNormal)
26 return avgNormal, std_normal
29def calc_lognormal_style_pars_from_normal_pars(mu_normal, std_normal):
30 varNormal = std_normal**2
31 avg_lognormal = math.exp(mu_normal + varNormal * 0.5)
32 varLognormal = (math.exp(varNormal) - 1) * avg_lognormal**2
33 std_lognormal = math.sqrt(varLognormal)
34 return avg_lognormal, std_lognormal
37def approx_beta(N, a=1.0, b=1.0):
38 """
39 Calculate a discrete approximation to the beta distribution. May be quite
40 slow, as it uses a rudimentary numeric integration method to generate the
41 discrete approximation.
43 Parameters
44 ----------
45 N : int
46 Size of discrete space vector to be returned.
47 a : float
48 First shape parameter (sometimes called alpha).
49 b : float
50 Second shape parameter (sometimes called beta).
52 Returns
53 -------
54 d : DiscreteDistribution
55 Probability associated with each point in array of discrete
56 points for discrete probability mass function.
57 """
58 P = 1000
59 vals = np.reshape(stats.beta.ppf(np.linspace(0.0, 1.0, N * P), a, b), (N, P))
60 atoms = np.mean(vals, axis=1)
61 pmv = np.ones(N) / float(N)
62 return DiscreteDistribution(pmv, atoms)
65def make_markov_approx_to_normal(x_grid, mu, sigma, K=351, bound=3.5):
66 """
67 Creates an approximation to a normal distribution with mean mu and standard
68 deviation sigma, returning a stochastic vector called p_vec, corresponding
69 to values in x_grid. If a RV is distributed x~N(mu,sigma), then the expectation
70 of a continuous function f() is E[f(x)] = numpy.dot(p_vec,f(x_grid)).
72 Parameters
73 ----------
74 x_grid: numpy.array
75 A sorted 1D array of floats representing discrete values that a normally
76 distributed RV could take on.
77 mu: float
78 Mean of the normal distribution to be approximated.
79 sigma: float
80 Standard deviation of the normal distribution to be approximated.
81 K: int
82 Number of points in the normal distribution to sample.
83 bound: float
84 Truncation bound of the normal distribution, as +/- bound*sigma.
86 Returns
87 -------
88 p_vec: numpy.array
89 A stochastic vector with probability weights for each x in x_grid.
90 """
91 x_n = x_grid.size # Number of points in the outcome grid
92 lower_bound = -bound # Lower bound of normal draws to consider, in SD
93 upper_bound = bound # Upper bound of normal draws to consider, in SD
94 raw_sample = np.linspace(
95 lower_bound, upper_bound, K
96 ) # Evenly spaced draws between bounds
97 f_weights = stats.norm.pdf(raw_sample) # Relative probability of each draw
98 sample = mu + sigma * raw_sample # Adjusted bounds, given mean and stdev
99 w_vec = np.zeros(x_n) # A vector of outcome weights
101 # Find the relative position of each of the draws
102 sample_pos = np.searchsorted(x_grid, sample)
103 sample_pos[sample_pos < 1] = 1
104 sample_pos[sample_pos > x_n - 1] = x_n - 1
106 # Make arrays of the x_grid point directly above and below each draw
107 bot = x_grid[sample_pos - 1]
108 top = x_grid[sample_pos]
109 alpha = (sample - bot) / (top - bot)
111 # Keep the weights (alpha) in bounds
112 alpha_clipped = np.clip(alpha, 0.0, 1.0)
114 # Loop through each x_grid point and add up the probability that each nearby
115 # draw contributes to it (accounting for distance)
116 for j in range(1, x_n):
117 c = sample_pos == j
118 w_vec[j - 1] = w_vec[j - 1] + np.dot(f_weights[c], 1.0 - alpha_clipped[c])
119 w_vec[j] = w_vec[j] + np.dot(f_weights[c], alpha_clipped[c])
121 # Reweight the probabilities so they sum to 1
122 W = np.sum(w_vec)
123 p_vec = w_vec / W
125 # Check for obvious errors, and return p_vec
126 assert (
127 (np.all(p_vec >= 0.0))
128 and (np.all(p_vec <= 1.0))
129 and (np.isclose(np.sum(p_vec), 1.0))
130 )
131 return p_vec
134def make_markov_approx_to_normal_by_monte_carlo(x_grid, mu, sigma, N_draws=10000):
135 """
136 Creates an approximation to a normal distribution with mean mu and standard
137 deviation sigma, by Monte Carlo.
138 Returns a stochastic vector called p_vec, corresponding
139 to values in x_grid. If a RV is distributed x~N(mu,sigma), then the expectation
140 of a continuous function f() is E[f(x)] = numpy.dot(p_vec,f(x_grid)).
142 Parameters
143 ----------
144 x_grid: numpy.array
145 A sorted 1D array of floats representing discrete values that a normally
146 distributed RV could take on.
147 mu: float
148 Mean of the normal distribution to be approximated.
149 sigma: float
150 Standard deviation of the normal distribution to be approximated.
151 N_draws: int
152 Number of draws to use in Monte Carlo.
154 Returns
155 -------
156 p_vec: numpy.array
157 A stochastic vector with probability weights for each x in x_grid.
158 """
160 # Take random draws from the desired normal distribution
161 random_draws = np.random.normal(loc=mu, scale=sigma, size=N_draws)
163 # Compute the distance between the draws and points in x_grid
164 distance = np.abs(x_grid[:, np.newaxis] - random_draws[np.newaxis, :])
166 # Find the indices of the points in x_grid that are closest to the draws
167 distance_minimizing_index = np.argmin(distance, axis=0)
169 # For each point in x_grid, the approximate probability of that point is the number
170 # of Monte Carlo draws that are closest to that point
171 p_vec = np.zeros_like(x_grid)
172 for p_index, p in enumerate(p_vec):
173 p_vec[p_index] = np.sum(distance_minimizing_index == p_index) / N_draws
175 # Check for obvious errors, and return p_vec
176 assert (
177 (np.all(p_vec >= 0.0))
178 and (np.all(p_vec <= 1.0))
179 and (np.isclose(np.sum(p_vec), 1.0))
180 )
181 return p_vec
184def make_tauchen_ar1(N, sigma=1.0, ar_1=0.9, bound=3.0, inflendpoint=True):
185 """
186 Function to return a discretized version of an AR1 process.
187 See http://www.fperri.net/TEACHING/macrotheory08/numerical.pdf for details
189 Parameters
190 ----------
191 N: int
192 Size of discretized grid
193 sigma: float
194 Standard deviation of the error term
195 ar_1: float
196 AR1 coefficient
197 bound: float
198 The highest (lowest) grid point will be bound (-bound) multiplied by the unconditional
199 standard deviation of the process
200 inflendpoint: Bool
201 If True: implement the standard method as in Tauchen (1986):
202 assign the probability of jumping to a point outside the grid to the closest endpoint
203 If False: implement an alternative method:
204 discard the probability of jumping to a point outside the grid, effectively
205 reassigning it to the remaining points in proportion to their probability of being reached
207 Returns
208 -------
209 y: np.array
210 Grid points on which the discretized process takes values
211 trans_matrix: np.array
212 Markov transition array for the discretized process
213 """
214 yN = bound * sigma / ((1 - ar_1**2) ** 0.5)
215 y = np.linspace(-yN, yN, N)
216 d = y[1] - y[0]
217 cuts = (y[1:] + y[:-1]) / 2.0
218 if inflendpoint:
219 cuts = np.concatenate(([-np.inf], cuts, [np.inf]))
220 else:
221 cuts = np.concatenate(([y[0] - d / 2], cuts, [y[-1] + d / 2]))
222 dist = np.reshape(cuts, (1, N + 1)) - np.reshape(ar_1 * y, (N, 1))
223 dist /= sigma
224 cdf_array = stats.norm.cdf(dist)
225 sf_array = stats.norm.sf(dist)
226 trans = cdf_array[:, 1:] - cdf_array[:, :-1]
227 trans_alt = sf_array[:, :-1] - sf_array[:, 1:]
228 trans_matrix = np.maximum(trans, trans_alt)
229 trans_matrix /= np.sum(trans_matrix, axis=1, keepdims=True)
230 return y, trans_matrix
233# ================================================================================
234# ==================== Functions for manipulating discrete distributions =========
235# ================================================================================
238def add_discrete_outcome_constant_mean(distribution, x, p, sort=False):
239 """
240 Adds a discrete outcome of x with probability p to an existing distribution,
241 holding constant the relative probabilities of other outcomes and overall mean.
243 Parameters
244 ----------
245 distribution : DiscreteDistribution
246 A one-dimensional DiscreteDistribution.
247 x : float
248 The new value to be added to the distribution.
249 p : float
250 The probability of the discrete outcome x occuring.
251 sort: bool
252 Whether or not to sort atoms before returning it
254 Returns
255 -------
256 d : DiscreteDistribution
257 Probability associated with each point in array of discrete
258 points for discrete probability mass function.
259 """
260 if (
261 isinstance(distribution, IndexDistribution)
262 and hasattr(distribution, "distributions")
263 and distribution.distributions
264 ):
265 # apply recursively on all the internal distributions
266 return IndexDistribution(
267 distributions=[
268 add_discrete_outcome_constant_mean(d, x, p)
269 for d in distribution.distributions
270 ],
271 seed=distribution.seed,
272 )
274 else:
275 atoms = np.append(x, distribution.atoms * (1 - p * x) / (1 - p))
276 pmv = np.append(p, distribution.pmv * (1 - p))
278 if sort:
279 indices = np.argsort(atoms)
280 atoms = atoms[indices]
281 pmv = pmv[indices]
283 # Update infimum and supremum
284 temp_x = np.array(x, ndmin=1)
285 try:
286 infimum = np.array(
287 [
288 np.minimum(temp_x[i], distribution.limit["infimum"][i])
289 for i in range(temp_x.size)
290 ]
291 )
292 except KeyError:
293 infimum = np.min(atoms, axis=-1, keepdims=True)
294 try:
295 supremum = np.array(
296 [
297 np.maximum(temp_x[i], distribution.limit["supremum"][i])
298 for i in range(temp_x.size)
299 ]
300 )
301 except KeyError:
302 supremum = np.max(atoms, axis=-1, keepdims=True)
304 limit = {
305 "dist": distribution,
306 "method": "add_discrete_outcome_constant_mean",
307 "x": x,
308 "p": p,
309 "infimum": infimum,
310 "supremum": supremum,
311 }
313 return DiscreteDistribution(pmv, atoms, seed=distribution.seed, limit=limit)
316def add_discrete_outcome(distribution, x, p, sort=False):
317 """
318 Adds a discrete outcome of x with probability p to an existing distribution,
319 holding constant the relative probabilities of other outcomes.
321 Parameters
322 ----------
323 distribution : DiscreteDistribution
324 One-dimensional distribution to which the outcome is to be added.
325 x : float
326 The new value to be added to the distribution.
327 p : float
328 The probability of the discrete outcome x occuring.
330 Returns
331 -------
332 d : DiscreteDistribution
333 Probability associated with each point in array of discrete
334 points for discrete probability mass function.
335 """
337 atoms = np.append(x, distribution.atoms)
338 pmv = np.append(p, distribution.pmv * (1 - p))
340 if sort:
341 indices = np.argsort(atoms)
342 atoms = atoms[indices]
343 pmv = pmv[indices]
345 # Update infimum and supremum
346 temp_x = np.array(x, ndmin=1)
347 try:
348 infimum = np.array(
349 [
350 np.minimum(temp_x[i], distribution.limit["infimum"][i])
351 for i in range(temp_x.size)
352 ]
353 )
354 except KeyError:
355 infimum = np.min(atoms, axis=-1, keepdims=True)
356 try:
357 supremum = np.array(
358 [
359 np.maximum(temp_x[i], distribution.limit["supremum"][i])
360 for i in range(temp_x.size)
361 ]
362 )
363 except KeyError:
364 supremum = np.max(atoms, axis=-1, keepdims=True)
366 limit = {
367 "dist": distribution,
368 "method": "add_discrete_outcome",
369 "x": x,
370 "p": p,
371 "infimum": infimum,
372 "supremum": supremum,
373 }
375 return DiscreteDistribution(pmv, atoms, seed=distribution.seed, limit=limit)
378def combine_indep_dstns(*distributions, seed=None):
379 """
380 Given n independent vector-valued discrete distributions, construct their joint discrete distribution.
381 Can take multivariate discrete distributions as inputs.
383 Parameters
384 ----------
385 distributions : DiscreteDistribution
386 Arbitrary number of discrete distributions to combine. Their realizations must be
387 vector-valued (for each D in distributions, it must be the case that len(D.dim())==1).
388 seed : int, optional
389 Value to use as the RNG seed for the combined distribution, default is 0.
391 Returns
392 -------
393 A DiscreteDistribution representing the joint distribution of the given
394 random variables.
395 """
396 # Get information on the distributions
397 dist_lengths = ()
398 dist_dims = ()
399 dist_is_labeled = ()
400 var_labels = ()
401 for dist in distributions:
402 if len(dist.dim()) > 1:
403 raise NotImplementedError(
404 "We currently only support combining vector-valued distributions."
405 )
407 dist_dims += (dist.dim(),)
408 dist_lengths += (len(dist.pmv),)
410 labeled = isinstance(dist, DiscreteDistributionLabeled)
411 dist_is_labeled += (labeled,)
412 if labeled:
413 var_labels += tuple(dist.dataset.data_vars.keys())
414 else:
415 var_labels += tuple([""] * dist.dim()[0])
417 dstn_count = len(distributions)
419 all_labeled = all(dist_is_labeled)
420 labels_are_unique = len(var_labels) == len(set(var_labels))
422 # We need the combinations of indices of realizations in all
423 # distributions
424 inds = np.meshgrid(
425 *[np.array(range(l), dtype=int) for l in dist_lengths], indexing="ij"
426 )
427 inds = [x.flatten() for x in inds]
429 atoms_out = []
430 P_temp = []
431 for i, ind_vec in enumerate(inds):
432 atoms_out += [distributions[i].atoms[..., ind_vec]]
433 P_temp += [distributions[i].pmv[ind_vec]]
435 atoms_out = np.concatenate(atoms_out, axis=0)
436 P_temp = np.stack(P_temp, axis=0)
437 P_out = np.prod(P_temp, axis=0)
439 assert np.isclose(np.sum(P_out), 1), "Probabilities do not sum to 1!"
441 # Make the limit dictionary
442 infimum = np.concatenate(
443 [distributions[i].limit["infimum"] for i in range(dstn_count)]
444 )
445 supremum = np.concatenate(
446 [distributions[i].limit["supremum"] for i in range(dstn_count)]
447 )
448 limit = {
449 "dist": distributions,
450 "method": "combine_indep_dstns",
451 "infimum": infimum,
452 "supremum": supremum,
453 }
455 if all_labeled and labels_are_unique:
456 combined_dstn = DiscreteDistributionLabeled(
457 pmv=P_out,
458 atoms=atoms_out,
459 var_names=var_labels,
460 limit=limit,
461 seed=seed,
462 )
463 else:
464 if all_labeled and not labels_are_unique:
465 warn(
466 "There are duplicated labels in the provided distributions. Returning a non-labeled combination"
467 )
468 combined_dstn = DiscreteDistribution(P_out, atoms_out, limit=limit, seed=seed)
470 return combined_dstn
473def calc_expectation(dstn, func=None, *args, **kwargs):
474 """
475 Expectation of a function, given an array of configurations of its inputs
476 along with a DiscreteDistribution object that specifies the probability
477 of each configuration. Computation is performed by looping over each atom
478 of the distribution and evaluating function one at a time. This approach is
479 broadly compatible with any func, but is slow because of the loop.
481 If func is relatively simple, and particularly if it does not involve array
482 operations like tiling, reshaping, or logical indexing, consider using expected
483 instead of calc_expectation. That function evaluates all atoms simultaneously,
484 avoiding the costly loop, but with reduced compatibility with "complex" operations.
486 Parameters
487 ----------
488 dstn : DiscreteDistribution or DiscreteDistributionLabeled
489 The distribution over which the function is to be evaluated.
490 func : function or None
491 The function to be evaluated. If dstn is a DiscreteDistribution, then this
492 function should take an array of shape dstn.dim() and return either arrays
493 of arbitrary shape or scalars. If dstn is a DiscreteDistributionLabeled, then
494 the function should take an dictionary-like object (like an xr.dataset) and
495 index into it with variable names. In either case, the function may also
496 take other arguments \\*args. Defaults to identity function.
497 \\*args :
498 Other inputs for func, representing the non-stochastic arguments.
499 The expectation is computed at ``f(dstn, *args, **kwargs)``.
500 \\*kwargs :
501 Other keyword inputs for func, representing the non-stochastic arguments.
502 The expectation is computed at ``f(dstn, *args, **kwargs)``.
504 Returns
505 -------
506 f_exp : np.array or scalar
507 The expectation of the function at the queried values.
508 Scalar if only one value.
509 """
510 func = func or (lambda x: x)
512 if hasattr(dstn, "dataset"): # cheap test for DiscreteDistributionLabeled
513 f_query = []
514 for i in range(len(dstn.pmv)):
515 temp_dict = {
516 key: float(dstn.variables[key][i]) for key in dstn.variables.keys()
517 }
518 f_query.append(func(temp_dict, *args, **kwargs))
519 else:
520 f_query = [
521 func(dstn.atoms[..., i], *args, **kwargs) for i in range(len(dstn.pmv))
522 ]
524 f_query = np.stack(f_query, axis=-1)
526 # From the numpy.dot documentation:
527 # If both a and b are 1-D arrays, it is inner product of vectors (without complex conjugation).
528 # If a is an N-D array and b is a 1-D array, it is a sum product over the last axis of a and b.
529 # Thus, if func returns scalars, f_exp will be a scalar and if it returns arrays f_exp
530 # will be an array of the same shape.
531 f_exp = np.dot(f_query, dstn.pmv)
533 return f_exp
536def distr_of_function(dstn, func=lambda x: x, *args):
537 """
538 Finds the distribution of a random variable Y that is a function
539 of discrete random variable atoms, Y=f(atoms).
541 Parameters
542 ----------
543 dstn : DiscreteDistribution
544 The distribution over which the function is to be evaluated.
545 func : function
546 The function to be evaluated.
547 This function should take an array of shape dstn.dim().
548 It may also take other arguments \\*args.
549 \\*args :
550 Additional non-stochastic arguments for func,
551 The function is computed at ``f(dstn, *args)``.
553 Returns
554 -------
555 f_dstn : DiscreteDistribution
556 The distribution of func(dstn).
557 """
558 # Apply function to every event realization
559 f_query = [func(dstn.atoms[..., i], *args) for i in range(len(dstn.pmv))]
561 # Stack results along their last (new) axis
562 f_query = np.stack(f_query, axis=-1)
564 f_dstn = DiscreteDistribution(dstn.pmv, f_query)
566 return f_dstn
569def expected(func=None, dist=None, args=(), **kwargs):
570 """
571 Compute the expectation of a function, given an array of configurations of its
572 inputs along with a DiscreteDistribution object that specifies the probability
573 of each configuration.
575 This approach will only work correctly with relatively simple functions that
576 do not involve manipulation of arrays, including reshaping and tiling, etc.
577 If the func you want to use has complex operations like this, use calc_expectation
578 instead. It performs the same operation, but by looping over each atom in the
579 distribution. In contrast, expected uses array operations and tries to compute
580 all atoms simultaneously.
582 Parameters
583 ----------
584 func : function
585 The function to be evaluated. This function should take the full array of
586 distribution values and return either arrays of arbitrary shape or scalars.
587 It may also take other arguments ``*args``. This function differs from the
588 `calc_expectation` function in that it uses numpy's vectorization and broad-
589 casting rules to avoid costly iteration.
590 Note: If you need to use a function that acts on single outcomes
591 of the distribution, use `distribution.calc_expectation` instead.
592 dist : DiscreteDistribution or DiscreteDistributionLabeled
593 The distribution over which the function is to be evaluated.
594 args : tuple
595 Other inputs for func, representing the non-stochastic arguments.
596 The expectation is computed at ``f(dstn, *args)``.
597 labels : bool
598 If True, the function should use labeled indexing instead of integer
599 indexing using the distribution's underlying rv coordinates. For example,
600 if `dims = ('rv', 'x')` and `coords = {'rv': ['a', 'b'], }`, then
601 the function can be `lambda x: x["a"] + x["b"]`.
603 Returns
604 -------
605 f_exp : np.array or scalar
606 The expectation of the function at the queried values.
607 Scalar if only one value.
608 """
610 if not isinstance(args, tuple):
611 args = (args,)
613 if isinstance(dist, DiscreteDistributionLabeled):
614 return dist.expected(func, *args, **kwargs)
615 elif isinstance(dist, DiscreteDistribution):
616 return dist.expected(func, *args)