Coverage for HARK/numba_tools.py: 100%

10 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-02 05:14 +0000

1import numpy as np 

2from numba import njit 

3 

4from HARK.rewards import ( 

5 CRRAutility, 

6 CRRAutility_inv, 

7 CRRAutility_invP, 

8 CRRAutilityP, 

9 CRRAutilityP_inv, 

10 CRRAutilityP_invP, 

11 CRRAutilityPP, 

12) 

13 

14CRRAutility = njit(CRRAutility, cache=True) 

15CRRAutilityP = njit(CRRAutilityP, cache=True) 

16CRRAutilityPP = njit(CRRAutilityPP, cache=True) 

17CRRAutilityP_inv = njit(CRRAutilityP_inv, cache=True) 

18CRRAutility_invP = njit(CRRAutility_invP, cache=True) 

19CRRAutility_inv = njit(CRRAutility_inv, cache=True) 

20CRRAutilityP_invP = njit(CRRAutilityP_invP, cache=True) 

21 

22 

23@njit(cache=True, error_model="numpy") 

24def _interp_decay( 

25 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap 

26): # pragma: no cover 

27 # Make a decay extrapolation 

28 slope_at_top = (y_list[-1] - y_list[-2]) / (x_list[-1] - x_list[-2]) 

29 level_diff = intercept_limit + slope_limit * x_list[-1] - y_list[-1] 

30 slope_diff = slope_limit - slope_at_top 

31 

32 decay_extrap_A = level_diff 

33 decay_extrap_B = -slope_diff / level_diff 

34 intercept_limit = intercept_limit 

35 slope_limit = slope_limit 

36 

37 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1) 

38 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

39 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i] 

40 

41 if not lower_extrap: 

42 below_lower_bound = x0 < x_list[0] 

43 y0[below_lower_bound] = np.nan 

44 

45 above_upper_bound = x0 > x_list[-1] 

46 x_temp = x0[above_upper_bound] - x_list[-1] 

47 

48 y0[above_upper_bound] = ( 

49 intercept_limit 

50 + slope_limit * x0[above_upper_bound] 

51 - decay_extrap_A * np.exp(-decay_extrap_B * x_temp) 

52 ) 

53 

54 return y0 

55 

56 

57@njit(cache=True, error_model="numpy") 

58def _interp_linear(x0, x_list, y_list, lower_extrap): # pragma: no cover 

59 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1) 

60 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

61 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i] 

62 

63 if not lower_extrap: 

64 below_lower_bound = x0 < x_list[0] 

65 y0[below_lower_bound] = np.nan 

66 

67 return y0 

68 

69 

70@njit(cache=True, error_model="numpy") 

71def linear_interp_fast( 

72 x0, x_list, y_list, intercept_limit=None, slope_limit=None, lower_extrap=False 

73): # pragma: no cover 

74 if intercept_limit is None and slope_limit is None: 

75 return _interp_linear(x0, x_list, y_list, lower_extrap) 

76 else: 

77 return _interp_decay( 

78 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap 

79 ) 

80 

81 

82@njit(cache=True, error_model="numpy") 

83def _interp_linear_deriv(x0, x_list, y_list, lower_extrap): # pragma: no cover 

84 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1) 

85 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

86 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i] 

87 dydx = (y_list[i] - y_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

88 

89 if not lower_extrap: 

90 below_lower_bound = x0 < x_list[0] 

91 y0[below_lower_bound] = np.nan 

92 dydx[below_lower_bound] = np.nan 

93 

94 return y0, dydx 

95 

96 

97@njit(cache=True, error_model="numpy") 

98def _interp_decay_deriv( 

99 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap 

100): # pragma: no cover 

101 # Make a decay extrapolation 

102 slope_at_top = (y_list[-1] - y_list[-2]) / (x_list[-1] - x_list[-2]) 

103 level_diff = intercept_limit + slope_limit * x_list[-1] - y_list[-1] 

104 slope_diff = slope_limit - slope_at_top 

105 

106 decay_extrap_A = level_diff 

107 decay_extrap_B = -slope_diff / level_diff 

108 intercept_limit = intercept_limit 

109 slope_limit = slope_limit 

110 

111 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1) 

112 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

113 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i] 

114 dydx = (y_list[i] - y_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

115 

116 if not lower_extrap: 

117 below_lower_bound = x0 < x_list[0] 

118 y0[below_lower_bound] = np.nan 

119 dydx[below_lower_bound] = np.nan 

120 

121 above_upper_bound = x0 > x_list[-1] 

122 x_temp = x0[above_upper_bound] - x_list[-1] 

123 

124 y0[above_upper_bound] = ( 

125 intercept_limit 

126 + slope_limit * x0[above_upper_bound] 

127 - decay_extrap_A * np.exp(-decay_extrap_B * x_temp) 

128 ) 

129 

130 dydx[above_upper_bound] = slope_limit + decay_extrap_B * decay_extrap_A * np.exp( 

131 -decay_extrap_B * x_temp 

132 ) 

133 

134 return y0, dydx 

135 

136 

137@njit(cache=True, error_model="numpy") 

138def linear_interp_deriv_fast( 

139 x0, x_list, y_list, intercept_limit=None, slope_limit=None, lower_extrap=False 

140): # pragma: no cover 

141 if intercept_limit is None and slope_limit is None: 

142 return _interp_linear_deriv(x0, x_list, y_list, lower_extrap) 

143 else: 

144 return _interp_decay_deriv( 

145 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap 

146 ) 

147 

148 

149@njit(cache=True, error_model="numpy") 

150def _spline_decay( 

151 x_init, x_list, y_list, dydx_list, intercept_limit, slope_limit, lower_extrap 

152): # pragma: no cover 

153 n = x_list.size 

154 

155 coeffs = np.empty((n + 1, 4)) 

156 

157 # Define lower extrapolation as linear function (or just NaN) 

158 if lower_extrap: 

159 coeffs[0] = np.array([y_list[0], dydx_list[0], 0, 0]) 

160 else: 

161 coeffs[0] = np.array([np.nan, np.nan, np.nan, np.nan]) 

162 

163 # Calculate interpolation coefficients on segments mapped to [0,1] 

164 xdiff = np.diff(x_list) 

165 ydiff = np.diff(y_list) 

166 dydx0 = dydx_list[:-1] * xdiff 

167 dydx1 = dydx_list[1:] * xdiff 

168 coeffs[1:-1, 0] = y_list[:-1] 

169 coeffs[1:-1, 1] = dydx0 

170 coeffs[1:-1, 2] = 3 * ydiff - 2 * dydx0 - dydx1 

171 coeffs[1:-1, 3] = -2 * ydiff + dydx0 + dydx1 

172 

173 # Calculate extrapolation coefficients as a decay toward limiting function y = mx+b 

174 gap = slope_limit * x_list[n - 1] + intercept_limit - y_list[n - 1] 

175 slope = slope_limit - dydx_list[n - 1] 

176 if (gap != 0) and (slope <= 0): 

177 coeffs[-1] = np.array([intercept_limit, slope_limit, gap, slope / gap]) 

178 elif slope > 0: 

179 # fixing a problem when slope is positive 

180 coeffs[-1] = np.array([intercept_limit, slope_limit, 0, 0]) 

181 else: 

182 coeffs[-1] = np.array([intercept_limit, slope_limit, gap, 0]) 

183 

184 m = len(x_init) 

185 pos = np.searchsorted(x_list, x_init) 

186 y = np.zeros(m) 

187 if y.size > 0: 

188 out_bot = pos == 0 

189 out_top = pos == n 

190 in_bnds = np.logical_not(np.logical_or(out_bot, out_top)) 

191 

192 # Do the "in bounds" evaluation points 

193 i = pos[in_bnds] 

194 coeffs_in = coeffs[i, :] 

195 alpha = (x_init[in_bnds] - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

196 y[in_bnds] = coeffs_in[:, 0] + alpha * ( 

197 coeffs_in[:, 1] + alpha * (coeffs_in[:, 2] + alpha * coeffs_in[:, 3]) 

198 ) 

199 

200 # Do the "out of bounds" evaluation points 

201 y[out_bot] = coeffs[0, 0] + coeffs[0, 1] * (x_init[out_bot] - x_list[0]) 

202 alpha = x_init[out_top] - x_list[n - 1] 

203 y[out_top] = ( 

204 coeffs[n, 0] 

205 + x_init[out_top] * coeffs[n, 1] 

206 - coeffs[n, 2] * np.exp(alpha * coeffs[n, 3]) 

207 ) 

208 

209 dydx = np.zeros(m) 

210 if dydx.size > 0: 

211 out_bot = pos == 0 

212 out_top = pos == n 

213 in_bnds = np.logical_not(np.logical_or(out_bot, out_top)) 

214 

215 # Do the "in bounds" evaluation points 

216 i = pos[in_bnds] 

217 coeffs_in = coeffs[i, :] 

218 alpha = (x_init[in_bnds] - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

219 dydx[in_bnds] = ( 

220 coeffs_in[:, 1] 

221 + alpha * (2 * coeffs_in[:, 2] + alpha * 3 * coeffs_in[:, 3]) 

222 ) / (x_list[i] - x_list[i - 1]) 

223 

224 # Do the "out of bounds" evaluation points 

225 dydx[out_bot] = coeffs[0, 1] 

226 alpha = x_init[out_top] - x_list[n - 1] 

227 dydx[out_top] = coeffs[n, 1] - coeffs[n, 2] * coeffs[n, 3] * np.exp( 

228 alpha * coeffs[n, 3] 

229 ) 

230 

231 return y, dydx 

232 

233 

234@njit(cache=True, error_model="numpy") 

235def cubic_interp_fast( 

236 x0, 

237 x_list, 

238 y_list, 

239 dydx_list, 

240 intercept_limit=None, 

241 slope_limit=None, 

242 lower_extrap=False, 

243): # pragma: no cover 

244 if intercept_limit is None and slope_limit is None: 

245 slope = dydx_list[-1] 

246 intercept = y_list[-1] - slope * x_list[-1] 

247 

248 return _spline_decay( 

249 x0, x_list, y_list, dydx_list, intercept, slope, lower_extrap 

250 ) 

251 else: 

252 return _spline_decay( 

253 x0, x_list, y_list, dydx_list, intercept_limit, slope_limit, lower_extrap 

254 )