Coverage for HARK/numba_tools.py: 100%
10 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 05:14 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 05:14 +0000
1import numpy as np
2from numba import njit
4from HARK.rewards import (
5 CRRAutility,
6 CRRAutility_inv,
7 CRRAutility_invP,
8 CRRAutilityP,
9 CRRAutilityP_inv,
10 CRRAutilityP_invP,
11 CRRAutilityPP,
12)
14CRRAutility = njit(CRRAutility, cache=True)
15CRRAutilityP = njit(CRRAutilityP, cache=True)
16CRRAutilityPP = njit(CRRAutilityPP, cache=True)
17CRRAutilityP_inv = njit(CRRAutilityP_inv, cache=True)
18CRRAutility_invP = njit(CRRAutility_invP, cache=True)
19CRRAutility_inv = njit(CRRAutility_inv, cache=True)
20CRRAutilityP_invP = njit(CRRAutilityP_invP, cache=True)
23@njit(cache=True, error_model="numpy")
24def _interp_decay(
25 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap
26): # pragma: no cover
27 # Make a decay extrapolation
28 slope_at_top = (y_list[-1] - y_list[-2]) / (x_list[-1] - x_list[-2])
29 level_diff = intercept_limit + slope_limit * x_list[-1] - y_list[-1]
30 slope_diff = slope_limit - slope_at_top
32 decay_extrap_A = level_diff
33 decay_extrap_B = -slope_diff / level_diff
34 intercept_limit = intercept_limit
35 slope_limit = slope_limit
37 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1)
38 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
39 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i]
41 if not lower_extrap:
42 below_lower_bound = x0 < x_list[0]
43 y0[below_lower_bound] = np.nan
45 above_upper_bound = x0 > x_list[-1]
46 x_temp = x0[above_upper_bound] - x_list[-1]
48 y0[above_upper_bound] = (
49 intercept_limit
50 + slope_limit * x0[above_upper_bound]
51 - decay_extrap_A * np.exp(-decay_extrap_B * x_temp)
52 )
54 return y0
57@njit(cache=True, error_model="numpy")
58def _interp_linear(x0, x_list, y_list, lower_extrap): # pragma: no cover
59 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1)
60 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
61 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i]
63 if not lower_extrap:
64 below_lower_bound = x0 < x_list[0]
65 y0[below_lower_bound] = np.nan
67 return y0
70@njit(cache=True, error_model="numpy")
71def linear_interp_fast(
72 x0, x_list, y_list, intercept_limit=None, slope_limit=None, lower_extrap=False
73): # pragma: no cover
74 if intercept_limit is None and slope_limit is None:
75 return _interp_linear(x0, x_list, y_list, lower_extrap)
76 else:
77 return _interp_decay(
78 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap
79 )
82@njit(cache=True, error_model="numpy")
83def _interp_linear_deriv(x0, x_list, y_list, lower_extrap): # pragma: no cover
84 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1)
85 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
86 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i]
87 dydx = (y_list[i] - y_list[i - 1]) / (x_list[i] - x_list[i - 1])
89 if not lower_extrap:
90 below_lower_bound = x0 < x_list[0]
91 y0[below_lower_bound] = np.nan
92 dydx[below_lower_bound] = np.nan
94 return y0, dydx
97@njit(cache=True, error_model="numpy")
98def _interp_decay_deriv(
99 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap
100): # pragma: no cover
101 # Make a decay extrapolation
102 slope_at_top = (y_list[-1] - y_list[-2]) / (x_list[-1] - x_list[-2])
103 level_diff = intercept_limit + slope_limit * x_list[-1] - y_list[-1]
104 slope_diff = slope_limit - slope_at_top
106 decay_extrap_A = level_diff
107 decay_extrap_B = -slope_diff / level_diff
108 intercept_limit = intercept_limit
109 slope_limit = slope_limit
111 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1)
112 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
113 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i]
114 dydx = (y_list[i] - y_list[i - 1]) / (x_list[i] - x_list[i - 1])
116 if not lower_extrap:
117 below_lower_bound = x0 < x_list[0]
118 y0[below_lower_bound] = np.nan
119 dydx[below_lower_bound] = np.nan
121 above_upper_bound = x0 > x_list[-1]
122 x_temp = x0[above_upper_bound] - x_list[-1]
124 y0[above_upper_bound] = (
125 intercept_limit
126 + slope_limit * x0[above_upper_bound]
127 - decay_extrap_A * np.exp(-decay_extrap_B * x_temp)
128 )
130 dydx[above_upper_bound] = slope_limit + decay_extrap_B * decay_extrap_A * np.exp(
131 -decay_extrap_B * x_temp
132 )
134 return y0, dydx
137@njit(cache=True, error_model="numpy")
138def linear_interp_deriv_fast(
139 x0, x_list, y_list, intercept_limit=None, slope_limit=None, lower_extrap=False
140): # pragma: no cover
141 if intercept_limit is None and slope_limit is None:
142 return _interp_linear_deriv(x0, x_list, y_list, lower_extrap)
143 else:
144 return _interp_decay_deriv(
145 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap
146 )
149@njit(cache=True, error_model="numpy")
150def _spline_decay(
151 x_init, x_list, y_list, dydx_list, intercept_limit, slope_limit, lower_extrap
152): # pragma: no cover
153 n = x_list.size
155 coeffs = np.empty((n + 1, 4))
157 # Define lower extrapolation as linear function (or just NaN)
158 if lower_extrap:
159 coeffs[0] = np.array([y_list[0], dydx_list[0], 0, 0])
160 else:
161 coeffs[0] = np.array([np.nan, np.nan, np.nan, np.nan])
163 # Calculate interpolation coefficients on segments mapped to [0,1]
164 xdiff = np.diff(x_list)
165 ydiff = np.diff(y_list)
166 dydx0 = dydx_list[:-1] * xdiff
167 dydx1 = dydx_list[1:] * xdiff
168 coeffs[1:-1, 0] = y_list[:-1]
169 coeffs[1:-1, 1] = dydx0
170 coeffs[1:-1, 2] = 3 * ydiff - 2 * dydx0 - dydx1
171 coeffs[1:-1, 3] = -2 * ydiff + dydx0 + dydx1
173 # Calculate extrapolation coefficients as a decay toward limiting function y = mx+b
174 gap = slope_limit * x_list[n - 1] + intercept_limit - y_list[n - 1]
175 slope = slope_limit - dydx_list[n - 1]
176 if (gap != 0) and (slope <= 0):
177 coeffs[-1] = np.array([intercept_limit, slope_limit, gap, slope / gap])
178 elif slope > 0:
179 # fixing a problem when slope is positive
180 coeffs[-1] = np.array([intercept_limit, slope_limit, 0, 0])
181 else:
182 coeffs[-1] = np.array([intercept_limit, slope_limit, gap, 0])
184 m = len(x_init)
185 pos = np.searchsorted(x_list, x_init)
186 y = np.zeros(m)
187 if y.size > 0:
188 out_bot = pos == 0
189 out_top = pos == n
190 in_bnds = np.logical_not(np.logical_or(out_bot, out_top))
192 # Do the "in bounds" evaluation points
193 i = pos[in_bnds]
194 coeffs_in = coeffs[i, :]
195 alpha = (x_init[in_bnds] - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
196 y[in_bnds] = coeffs_in[:, 0] + alpha * (
197 coeffs_in[:, 1] + alpha * (coeffs_in[:, 2] + alpha * coeffs_in[:, 3])
198 )
200 # Do the "out of bounds" evaluation points
201 y[out_bot] = coeffs[0, 0] + coeffs[0, 1] * (x_init[out_bot] - x_list[0])
202 alpha = x_init[out_top] - x_list[n - 1]
203 y[out_top] = (
204 coeffs[n, 0]
205 + x_init[out_top] * coeffs[n, 1]
206 - coeffs[n, 2] * np.exp(alpha * coeffs[n, 3])
207 )
209 dydx = np.zeros(m)
210 if dydx.size > 0:
211 out_bot = pos == 0
212 out_top = pos == n
213 in_bnds = np.logical_not(np.logical_or(out_bot, out_top))
215 # Do the "in bounds" evaluation points
216 i = pos[in_bnds]
217 coeffs_in = coeffs[i, :]
218 alpha = (x_init[in_bnds] - x_list[i - 1]) / (x_list[i] - x_list[i - 1])
219 dydx[in_bnds] = (
220 coeffs_in[:, 1]
221 + alpha * (2 * coeffs_in[:, 2] + alpha * 3 * coeffs_in[:, 3])
222 ) / (x_list[i] - x_list[i - 1])
224 # Do the "out of bounds" evaluation points
225 dydx[out_bot] = coeffs[0, 1]
226 alpha = x_init[out_top] - x_list[n - 1]
227 dydx[out_top] = coeffs[n, 1] - coeffs[n, 2] * coeffs[n, 3] * np.exp(
228 alpha * coeffs[n, 3]
229 )
231 return y, dydx
234@njit(cache=True, error_model="numpy")
235def cubic_interp_fast(
236 x0,
237 x_list,
238 y_list,
239 dydx_list,
240 intercept_limit=None,
241 slope_limit=None,
242 lower_extrap=False,
243): # pragma: no cover
244 if intercept_limit is None and slope_limit is None:
245 slope = dydx_list[-1]
246 intercept = y_list[-1] - slope * x_list[-1]
248 return _spline_decay(
249 x0, x_list, y_list, dydx_list, intercept, slope, lower_extrap
250 )
251 else:
252 return _spline_decay(
253 x0, x_list, y_list, dydx_list, intercept_limit, slope_limit, lower_extrap
254 )