Coverage for HARK / numba_tools.py: 100%

10 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-08 05:31 +0000

1import numpy as np 

2from numba import njit 

3 

4from HARK.rewards import ( 

5 CRRAutility_X, 

6 CRRAutility_inv, 

7 CRRAutility_invP, 

8 CRRAutilityP_X, 

9 CRRAutilityP_inv, 

10 CRRAutilityP_invP, 

11 CRRAutilityPP_X, 

12) 

13 

14CRRAutility = njit(CRRAutility_X, cache=True) 

15CRRAutilityP = njit(CRRAutilityP_X, cache=True) 

16CRRAutilityPP = njit(CRRAutilityPP_X, cache=True) 

17CRRAutilityP_inv = njit(CRRAutilityP_inv, cache=True) 

18CRRAutility_invP = njit(CRRAutility_invP, cache=True) 

19CRRAutility_inv = njit(CRRAutility_inv, cache=True) 

20CRRAutilityP_invP = njit(CRRAutilityP_invP, cache=True) 

21 

22 

23@njit(cache=True, error_model="numpy") 

24def _interp_decay( 

25 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap 

26): # pragma: no cover 

27 # Make a decay extrapolation 

28 slope_at_top = (y_list[-1] - y_list[-2]) / (x_list[-1] - x_list[-2]) 

29 level_diff = intercept_limit + slope_limit * x_list[-1] - y_list[-1] 

30 slope_diff = slope_limit - slope_at_top 

31 

32 decay_extrap_A = level_diff 

33 decay_extrap_B = -slope_diff / level_diff 

34 intercept_limit = intercept_limit 

35 slope_limit = slope_limit 

36 

37 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1) 

38 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

39 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i] 

40 

41 if not lower_extrap: 

42 below_lower_bound = x0 < x_list[0] 

43 y0[below_lower_bound] = np.nan 

44 

45 above_upper_bound = x0 > x_list[-1] 

46 x_temp = x0[above_upper_bound] - x_list[-1] 

47 

48 y0[above_upper_bound] = ( 

49 intercept_limit 

50 + slope_limit * x0[above_upper_bound] 

51 - decay_extrap_A * np.exp(-decay_extrap_B * x_temp) 

52 ) 

53 

54 return y0 

55 

56 

57@njit(cache=True, error_model="numpy") 

58def _interp_linear(x0, x_list, y_list, lower_extrap): # pragma: no cover 

59 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1) 

60 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

61 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i] 

62 

63 if not lower_extrap: 

64 below_lower_bound = x0 < x_list[0] 

65 y0[below_lower_bound] = np.nan 

66 

67 return y0 

68 

69 

70@njit(cache=True, error_model="numpy") 

71def linear_interp_fast( 

72 x0, x_list, y_list, intercept_limit=None, slope_limit=None, lower_extrap=False 

73): # pragma: no cover 

74 if intercept_limit is None and slope_limit is None: 

75 return _interp_linear(x0, x_list, y_list, lower_extrap) 

76 else: 

77 return _interp_decay( 

78 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap 

79 ) 

80 

81 

82@njit(cache=True, error_model="numpy") 

83def _interp_linear_deriv(x0, x_list, y_list, lower_extrap): # pragma: no cover 

84 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1) 

85 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

86 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i] 

87 dydx = (y_list[i] - y_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

88 

89 if not lower_extrap: 

90 below_lower_bound = x0 < x_list[0] 

91 y0[below_lower_bound] = np.nan 

92 dydx[below_lower_bound] = np.nan 

93 

94 return y0, dydx 

95 

96 

97@njit(cache=True, error_model="numpy") 

98def _interp_decay_deriv( 

99 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap 

100): # pragma: no cover 

101 # Make a decay extrapolation 

102 slope_at_top = (y_list[-1] - y_list[-2]) / (x_list[-1] - x_list[-2]) 

103 level_diff = intercept_limit + slope_limit * x_list[-1] - y_list[-1] 

104 slope_diff = slope_limit - slope_at_top 

105 

106 decay_extrap_A = level_diff 

107 decay_extrap_B = -slope_diff / level_diff 

108 intercept_limit = intercept_limit 

109 slope_limit = slope_limit 

110 

111 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1) 

112 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

113 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i] 

114 dydx = (y_list[i] - y_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

115 

116 if not lower_extrap: 

117 below_lower_bound = x0 < x_list[0] 

118 y0[below_lower_bound] = np.nan 

119 dydx[below_lower_bound] = np.nan 

120 

121 above_upper_bound = x0 > x_list[-1] 

122 x_temp = x0[above_upper_bound] - x_list[-1] 

123 

124 y0[above_upper_bound] = ( 

125 intercept_limit 

126 + slope_limit * x0[above_upper_bound] 

127 - decay_extrap_A * np.exp(-decay_extrap_B * x_temp) 

128 ) 

129 

130 dydx[above_upper_bound] = slope_limit + decay_extrap_B * decay_extrap_A * np.exp( 

131 -decay_extrap_B * x_temp 

132 ) 

133 

134 return y0, dydx 

135 

136 

137@njit(cache=True, error_model="numpy") 

138def linear_interp_deriv_fast( 

139 x0, x_list, y_list, intercept_limit=None, slope_limit=None, lower_extrap=False 

140): # pragma: no cover 

141 if intercept_limit is None and slope_limit is None: 

142 return _interp_linear_deriv(x0, x_list, y_list, lower_extrap) 

143 else: 

144 return _interp_decay_deriv( 

145 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap 

146 ) 

147 

148 

149@njit(cache=True, error_model="numpy") 

150def _spline_decay( 

151 x_init, x_list, y_list, dydx_list, intercept_limit, slope_limit, lower_extrap 

152): # pragma: no cover 

153 n = x_list.size 

154 

155 coeffs = np.empty((n + 1, 4)) 

156 

157 # Define lower extrapolation as linear function (or just NaN) 

158 if lower_extrap: 

159 coeffs[0] = np.array([y_list[0], dydx_list[0], 0, 0]) 

160 else: 

161 coeffs[0] = np.array([np.nan, np.nan, np.nan, np.nan]) 

162 

163 # Calculate interpolation coefficients on segments mapped to [0,1] 

164 xdiff = np.diff(x_list) 

165 ydiff = np.diff(y_list) 

166 dydx0 = dydx_list[:-1] * xdiff 

167 dydx1 = dydx_list[1:] * xdiff 

168 coeffs[1:-1, 0] = y_list[:-1] 

169 coeffs[1:-1, 1] = dydx0 

170 coeffs[1:-1, 2] = 3 * ydiff - 2 * dydx0 - dydx1 

171 coeffs[1:-1, 3] = -2 * ydiff + dydx0 + dydx1 

172 

173 # Calculate extrapolation coefficients as a decay toward limiting function y = mx+b 

174 gap = slope_limit * x_list[n - 1] + intercept_limit - y_list[n - 1] 

175 slope = slope_limit - dydx_list[n - 1] 

176 if (gap != 0) and (slope <= 0): 

177 coeffs[-1] = np.array([intercept_limit, slope_limit, gap, slope / gap]) 

178 elif slope > 0: 

179 # fixing a problem when slope is positive 

180 coeffs[-1] = np.array([intercept_limit, slope_limit, 0, 0]) 

181 else: 

182 coeffs[-1] = np.array([intercept_limit, slope_limit, gap, 0]) 

183 

184 m = len(x_init) 

185 pos = np.searchsorted(x_list, x_init) 

186 y = np.zeros(m) 

187 dydx = np.zeros(m) 

188 

189 if m > 0: 

190 out_bot = pos == 0 

191 out_top = pos == n 

192 in_bnds = np.logical_not(np.logical_or(out_bot, out_top)) 

193 

194 # In-bounds evaluation 

195 i = pos[in_bnds] 

196 coeffs_in = coeffs[i, :] 

197 alpha_in = (x_init[in_bnds] - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

198 y[in_bnds] = coeffs_in[:, 0] + alpha_in * ( 

199 coeffs_in[:, 1] + alpha_in * (coeffs_in[:, 2] + alpha_in * coeffs_in[:, 3]) 

200 ) 

201 dydx[in_bnds] = ( 

202 coeffs_in[:, 1] 

203 + alpha_in * (2 * coeffs_in[:, 2] + alpha_in * 3 * coeffs_in[:, 3]) 

204 ) / (x_list[i] - x_list[i - 1]) 

205 

206 # Out-of-bounds: bottom 

207 y[out_bot] = coeffs[0, 0] + coeffs[0, 1] * (x_init[out_bot] - x_list[0]) 

208 dydx[out_bot] = coeffs[0, 1] 

209 

210 # Out-of-bounds: top 

211 alpha_top = x_init[out_top] - x_list[n - 1] 

212 y[out_top] = ( 

213 coeffs[n, 0] 

214 + x_init[out_top] * coeffs[n, 1] 

215 - coeffs[n, 2] * np.exp(alpha_top * coeffs[n, 3]) 

216 ) 

217 dydx[out_top] = coeffs[n, 1] - coeffs[n, 2] * coeffs[n, 3] * np.exp( 

218 alpha_top * coeffs[n, 3] 

219 ) 

220 

221 return y, dydx 

222 

223 

224@njit(cache=True, error_model="numpy") 

225def cubic_interp_fast( 

226 x0, 

227 x_list, 

228 y_list, 

229 dydx_list, 

230 intercept_limit=None, 

231 slope_limit=None, 

232 lower_extrap=False, 

233): # pragma: no cover 

234 if intercept_limit is None and slope_limit is None: 

235 slope = dydx_list[-1] 

236 intercept = y_list[-1] - slope * x_list[-1] 

237 

238 return _spline_decay( 

239 x0, x_list, y_list, dydx_list, intercept, slope, lower_extrap 

240 ) 

241 else: 

242 return _spline_decay( 

243 x0, x_list, y_list, dydx_list, intercept_limit, slope_limit, lower_extrap 

244 )