Coverage for HARK / numba_tools.py: 100%
10 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-10 06:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-10 06:19 +0000
1import numpy as np
2from numba import njit
4from HARK.rewards import (
5 CRRAutility_X,
6 CRRAutility_inv,
7 CRRAutility_invP,
8 CRRAutilityP_X,
9 CRRAutilityP_inv,
10 CRRAutilityP_invP,
11 CRRAutilityPP_X,
12)
14CRRAutility = njit(CRRAutility_X, cache=True)
15CRRAutilityP = njit(CRRAutilityP_X, cache=True)
16CRRAutilityPP = njit(CRRAutilityPP_X, cache=True)
17CRRAutilityP_inv = njit(CRRAutilityP_inv, cache=True)
18CRRAutility_invP = njit(CRRAutility_invP, cache=True)
19CRRAutility_inv = njit(CRRAutility_inv, cache=True)
20CRRAutilityP_invP = njit(CRRAutilityP_invP, cache=True)
23@njit(cache=True, error_model="numpy")
24def _decay_extrap_coeffs(
25 x_list, y_list, intercept_limit, slope_limit
26): # pragma: no cover
27 # Coefficients for decay extrapolation toward the line y = slope*x + intercept.
28 slope_at_top = (y_list[-1] - y_list[-2]) / (x_list[-1] - x_list[-2])
29 level_diff = intercept_limit + slope_limit * x_list[-1] - y_list[-1]
30 slope_diff = slope_limit - slope_at_top
31 decay_extrap_A = level_diff
32 decay_extrap_B = -slope_diff / level_diff
33 return decay_extrap_A, decay_extrap_B
36@njit(cache=True, error_model="numpy")
37def _interp_linear(x0, x_list, y_list, lower_extrap): # pragma: no cover
38 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1)
39 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
40 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i]
42 if not lower_extrap:
43 below_lower_bound = x0 < x_list[0]
44 y0[below_lower_bound] = np.nan
46 return y0
49@njit(cache=True, error_model="numpy")
50def _apply_y_decay_extrap(
51 x0, y0, x_list, intercept_limit, slope_limit, decay_extrap_A, decay_extrap_B
52): # pragma: no cover
53 above_upper_bound = x0 > x_list[-1]
54 x_temp = x0[above_upper_bound] - x_list[-1]
55 y0[above_upper_bound] = (
56 intercept_limit
57 + slope_limit * x0[above_upper_bound]
58 - decay_extrap_A * np.exp(-decay_extrap_B * x_temp)
59 )
60 return above_upper_bound, x_temp
63@njit(cache=True, error_model="numpy")
64def _interp_decay(
65 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap
66): # pragma: no cover
67 decay_extrap_A, decay_extrap_B = _decay_extrap_coeffs(
68 x_list, y_list, intercept_limit, slope_limit
69 )
70 y0 = _interp_linear(x0, x_list, y_list, lower_extrap)
71 _apply_y_decay_extrap(
72 x0, y0, x_list, intercept_limit, slope_limit, decay_extrap_A, decay_extrap_B
73 )
74 return y0
77@njit(cache=True, error_model="numpy")
78def linear_interp_fast(
79 x0, x_list, y_list, intercept_limit=None, slope_limit=None, lower_extrap=False
80): # pragma: no cover
81 if intercept_limit is None and slope_limit is None:
82 return _interp_linear(x0, x_list, y_list, lower_extrap)
83 else:
84 return _interp_decay(
85 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap
86 )
89@njit(cache=True, error_model="numpy")
90def _interp_linear_deriv(x0, x_list, y_list, lower_extrap): # pragma: no cover
91 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1)
92 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
93 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i]
94 dydx = (y_list[i] - y_list[i - 1]) / (x_list[i] - x_list[i - 1])
96 if not lower_extrap:
97 below_lower_bound = x0 < x_list[0]
98 y0[below_lower_bound] = np.nan
99 dydx[below_lower_bound] = np.nan
101 return y0, dydx
104@njit(cache=True, error_model="numpy")
105def _interp_decay_deriv(
106 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap
107): # pragma: no cover
108 decay_extrap_A, decay_extrap_B = _decay_extrap_coeffs(
109 x_list, y_list, intercept_limit, slope_limit
110 )
111 y0, dydx = _interp_linear_deriv(x0, x_list, y_list, lower_extrap)
112 above_upper_bound, x_temp = _apply_y_decay_extrap(
113 x0, y0, x_list, intercept_limit, slope_limit, decay_extrap_A, decay_extrap_B
114 )
115 dydx[above_upper_bound] = slope_limit + decay_extrap_B * decay_extrap_A * np.exp(
116 -decay_extrap_B * x_temp
117 )
118 return y0, dydx
121@njit(cache=True, error_model="numpy")
122def linear_interp_deriv_fast(
123 x0, x_list, y_list, intercept_limit=None, slope_limit=None, lower_extrap=False
124): # pragma: no cover
125 if intercept_limit is None and slope_limit is None:
126 return _interp_linear_deriv(x0, x_list, y_list, lower_extrap)
127 else:
128 return _interp_decay_deriv(
129 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap
130 )
133@njit(cache=True, error_model="numpy")
134def _spline_decay(
135 x_init, x_list, y_list, dydx_list, intercept_limit, slope_limit, lower_extrap
136): # pragma: no cover
137 n = x_list.size
139 coeffs = np.empty((n + 1, 4))
141 # Define lower extrapolation as linear function (or just NaN)
142 if lower_extrap:
143 coeffs[0] = np.array([y_list[0], dydx_list[0], 0, 0])
144 else:
145 coeffs[0] = np.array([np.nan, np.nan, np.nan, np.nan])
147 # Calculate interpolation coefficients on segments mapped to [0,1]
148 xdiff = np.diff(x_list)
149 ydiff = np.diff(y_list)
150 dydx0 = dydx_list[:-1] * xdiff
151 dydx1 = dydx_list[1:] * xdiff
152 coeffs[1:-1, 0] = y_list[:-1]
153 coeffs[1:-1, 1] = dydx0
154 coeffs[1:-1, 2] = 3 * ydiff - 2 * dydx0 - dydx1
155 coeffs[1:-1, 3] = -2 * ydiff + dydx0 + dydx1
157 # Calculate extrapolation coefficients as a decay toward limiting function y = mx+b
158 gap = slope_limit * x_list[n - 1] + intercept_limit - y_list[n - 1]
159 slope = slope_limit - dydx_list[n - 1]
160 if (gap != 0) and (slope <= 0):
161 coeffs[-1] = np.array([intercept_limit, slope_limit, gap, slope / gap])
162 elif slope > 0:
163 # fixing a problem when slope is positive
164 coeffs[-1] = np.array([intercept_limit, slope_limit, 0, 0])
165 else:
166 coeffs[-1] = np.array([intercept_limit, slope_limit, gap, 0])
168 m = len(x_init)
169 pos = np.searchsorted(x_list, x_init)
170 y = np.zeros(m)
171 dydx = np.zeros(m)
173 if m > 0:
174 out_bot = pos == 0
175 out_top = pos == n
176 in_bnds = np.logical_not(np.logical_or(out_bot, out_top))
178 # In-bounds evaluation
179 i = pos[in_bnds]
180 coeffs_in = coeffs[i, :]
181 alpha_in = (x_init[in_bnds] - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
182 y[in_bnds] = coeffs_in[:, 0] + alpha_in * (
183 coeffs_in[:, 1] + alpha_in * (coeffs_in[:, 2] + alpha_in * coeffs_in[:, 3])
184 )
185 dydx[in_bnds] = (
186 coeffs_in[:, 1]
187 + alpha_in * (2 * coeffs_in[:, 2] + alpha_in * 3 * coeffs_in[:, 3])
188 ) / (x_list[i] - x_list[i - 1])
190 # Out-of-bounds: bottom
191 y[out_bot] = coeffs[0, 0] + coeffs[0, 1] * (x_init[out_bot] - x_list[0])
192 dydx[out_bot] = coeffs[0, 1]
194 # Out-of-bounds: top
195 alpha_top = x_init[out_top] - x_list[n - 1]
196 y[out_top] = (
197 coeffs[n, 0]
198 + x_init[out_top] * coeffs[n, 1]
199 - coeffs[n, 2] * np.exp(alpha_top * coeffs[n, 3])
200 )
201 dydx[out_top] = coeffs[n, 1] - coeffs[n, 2] * coeffs[n, 3] * np.exp(
202 alpha_top * coeffs[n, 3]
203 )
205 return y, dydx
208@njit(cache=True, error_model="numpy")
209def cubic_interp_fast(
210 x0,
211 x_list,
212 y_list,
213 dydx_list,
214 intercept_limit=None,
215 slope_limit=None,
216 lower_extrap=False,
217): # pragma: no cover
218 if intercept_limit is None and slope_limit is None:
219 slope = dydx_list[-1]
220 intercept = y_list[-1] - slope * x_list[-1]
222 return _spline_decay(
223 x0, x_list, y_list, dydx_list, intercept, slope, lower_extrap
224 )
225 else:
226 return _spline_decay(
227 x0, x_list, y_list, dydx_list, intercept_limit, slope_limit, lower_extrap
228 )