Coverage for HARK / numba_tools.py: 100%
10 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 05:31 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 05:31 +0000
1import numpy as np
2from numba import njit
4from HARK.rewards import (
5 CRRAutility_X,
6 CRRAutility_inv,
7 CRRAutility_invP,
8 CRRAutilityP_X,
9 CRRAutilityP_inv,
10 CRRAutilityP_invP,
11 CRRAutilityPP_X,
12)
14CRRAutility = njit(CRRAutility_X, cache=True)
15CRRAutilityP = njit(CRRAutilityP_X, cache=True)
16CRRAutilityPP = njit(CRRAutilityPP_X, cache=True)
17CRRAutilityP_inv = njit(CRRAutilityP_inv, cache=True)
18CRRAutility_invP = njit(CRRAutility_invP, cache=True)
19CRRAutility_inv = njit(CRRAutility_inv, cache=True)
20CRRAutilityP_invP = njit(CRRAutilityP_invP, cache=True)
23@njit(cache=True, error_model="numpy")
24def _interp_decay(
25 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap
26): # pragma: no cover
27 # Make a decay extrapolation
28 slope_at_top = (y_list[-1] - y_list[-2]) / (x_list[-1] - x_list[-2])
29 level_diff = intercept_limit + slope_limit * x_list[-1] - y_list[-1]
30 slope_diff = slope_limit - slope_at_top
32 decay_extrap_A = level_diff
33 decay_extrap_B = -slope_diff / level_diff
34 intercept_limit = intercept_limit
35 slope_limit = slope_limit
37 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1)
38 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
39 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i]
41 if not lower_extrap:
42 below_lower_bound = x0 < x_list[0]
43 y0[below_lower_bound] = np.nan
45 above_upper_bound = x0 > x_list[-1]
46 x_temp = x0[above_upper_bound] - x_list[-1]
48 y0[above_upper_bound] = (
49 intercept_limit
50 + slope_limit * x0[above_upper_bound]
51 - decay_extrap_A * np.exp(-decay_extrap_B * x_temp)
52 )
54 return y0
57@njit(cache=True, error_model="numpy")
58def _interp_linear(x0, x_list, y_list, lower_extrap): # pragma: no cover
59 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1)
60 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
61 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i]
63 if not lower_extrap:
64 below_lower_bound = x0 < x_list[0]
65 y0[below_lower_bound] = np.nan
67 return y0
70@njit(cache=True, error_model="numpy")
71def linear_interp_fast(
72 x0, x_list, y_list, intercept_limit=None, slope_limit=None, lower_extrap=False
73): # pragma: no cover
74 if intercept_limit is None and slope_limit is None:
75 return _interp_linear(x0, x_list, y_list, lower_extrap)
76 else:
77 return _interp_decay(
78 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap
79 )
82@njit(cache=True, error_model="numpy")
83def _interp_linear_deriv(x0, x_list, y_list, lower_extrap): # pragma: no cover
84 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1)
85 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
86 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i]
87 dydx = (y_list[i] - y_list[i - 1]) / (x_list[i] - x_list[i - 1])
89 if not lower_extrap:
90 below_lower_bound = x0 < x_list[0]
91 y0[below_lower_bound] = np.nan
92 dydx[below_lower_bound] = np.nan
94 return y0, dydx
97@njit(cache=True, error_model="numpy")
98def _interp_decay_deriv(
99 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap
100): # pragma: no cover
101 # Make a decay extrapolation
102 slope_at_top = (y_list[-1] - y_list[-2]) / (x_list[-1] - x_list[-2])
103 level_diff = intercept_limit + slope_limit * x_list[-1] - y_list[-1]
104 slope_diff = slope_limit - slope_at_top
106 decay_extrap_A = level_diff
107 decay_extrap_B = -slope_diff / level_diff
108 intercept_limit = intercept_limit
109 slope_limit = slope_limit
111 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1)
112 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
113 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i]
114 dydx = (y_list[i] - y_list[i - 1]) / (x_list[i] - x_list[i - 1])
116 if not lower_extrap:
117 below_lower_bound = x0 < x_list[0]
118 y0[below_lower_bound] = np.nan
119 dydx[below_lower_bound] = np.nan
121 above_upper_bound = x0 > x_list[-1]
122 x_temp = x0[above_upper_bound] - x_list[-1]
124 y0[above_upper_bound] = (
125 intercept_limit
126 + slope_limit * x0[above_upper_bound]
127 - decay_extrap_A * np.exp(-decay_extrap_B * x_temp)
128 )
130 dydx[above_upper_bound] = slope_limit + decay_extrap_B * decay_extrap_A * np.exp(
131 -decay_extrap_B * x_temp
132 )
134 return y0, dydx
137@njit(cache=True, error_model="numpy")
138def linear_interp_deriv_fast(
139 x0, x_list, y_list, intercept_limit=None, slope_limit=None, lower_extrap=False
140): # pragma: no cover
141 if intercept_limit is None and slope_limit is None:
142 return _interp_linear_deriv(x0, x_list, y_list, lower_extrap)
143 else:
144 return _interp_decay_deriv(
145 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap
146 )
149@njit(cache=True, error_model="numpy")
150def _spline_decay(
151 x_init, x_list, y_list, dydx_list, intercept_limit, slope_limit, lower_extrap
152): # pragma: no cover
153 n = x_list.size
155 coeffs = np.empty((n + 1, 4))
157 # Define lower extrapolation as linear function (or just NaN)
158 if lower_extrap:
159 coeffs[0] = np.array([y_list[0], dydx_list[0], 0, 0])
160 else:
161 coeffs[0] = np.array([np.nan, np.nan, np.nan, np.nan])
163 # Calculate interpolation coefficients on segments mapped to [0,1]
164 xdiff = np.diff(x_list)
165 ydiff = np.diff(y_list)
166 dydx0 = dydx_list[:-1] * xdiff
167 dydx1 = dydx_list[1:] * xdiff
168 coeffs[1:-1, 0] = y_list[:-1]
169 coeffs[1:-1, 1] = dydx0
170 coeffs[1:-1, 2] = 3 * ydiff - 2 * dydx0 - dydx1
171 coeffs[1:-1, 3] = -2 * ydiff + dydx0 + dydx1
173 # Calculate extrapolation coefficients as a decay toward limiting function y = mx+b
174 gap = slope_limit * x_list[n - 1] + intercept_limit - y_list[n - 1]
175 slope = slope_limit - dydx_list[n - 1]
176 if (gap != 0) and (slope <= 0):
177 coeffs[-1] = np.array([intercept_limit, slope_limit, gap, slope / gap])
178 elif slope > 0:
179 # fixing a problem when slope is positive
180 coeffs[-1] = np.array([intercept_limit, slope_limit, 0, 0])
181 else:
182 coeffs[-1] = np.array([intercept_limit, slope_limit, gap, 0])
184 m = len(x_init)
185 pos = np.searchsorted(x_list, x_init)
186 y = np.zeros(m)
187 dydx = np.zeros(m)
189 if m > 0:
190 out_bot = pos == 0
191 out_top = pos == n
192 in_bnds = np.logical_not(np.logical_or(out_bot, out_top))
194 # In-bounds evaluation
195 i = pos[in_bnds]
196 coeffs_in = coeffs[i, :]
197 alpha_in = (x_init[in_bnds] - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
198 y[in_bnds] = coeffs_in[:, 0] + alpha_in * (
199 coeffs_in[:, 1] + alpha_in * (coeffs_in[:, 2] + alpha_in * coeffs_in[:, 3])
200 )
201 dydx[in_bnds] = (
202 coeffs_in[:, 1]
203 + alpha_in * (2 * coeffs_in[:, 2] + alpha_in * 3 * coeffs_in[:, 3])
204 ) / (x_list[i] - x_list[i - 1])
206 # Out-of-bounds: bottom
207 y[out_bot] = coeffs[0, 0] + coeffs[0, 1] * (x_init[out_bot] - x_list[0])
208 dydx[out_bot] = coeffs[0, 1]
210 # Out-of-bounds: top
211 alpha_top = x_init[out_top] - x_list[n - 1]
212 y[out_top] = (
213 coeffs[n, 0]
214 + x_init[out_top] * coeffs[n, 1]
215 - coeffs[n, 2] * np.exp(alpha_top * coeffs[n, 3])
216 )
217 dydx[out_top] = coeffs[n, 1] - coeffs[n, 2] * coeffs[n, 3] * np.exp(
218 alpha_top * coeffs[n, 3]
219 )
221 return y, dydx
224@njit(cache=True, error_model="numpy")
225def cubic_interp_fast(
226 x0,
227 x_list,
228 y_list,
229 dydx_list,
230 intercept_limit=None,
231 slope_limit=None,
232 lower_extrap=False,
233): # pragma: no cover
234 if intercept_limit is None and slope_limit is None:
235 slope = dydx_list[-1]
236 intercept = y_list[-1] - slope * x_list[-1]
238 return _spline_decay(
239 x0, x_list, y_list, dydx_list, intercept, slope, lower_extrap
240 )
241 else:
242 return _spline_decay(
243 x0, x_list, y_list, dydx_list, intercept_limit, slope_limit, lower_extrap
244 )