Coverage for HARK / numba_tools.py: 100%

10 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-10 06:19 +0000

1import numpy as np 

2from numba import njit 

3 

4from HARK.rewards import ( 

5 CRRAutility_X, 

6 CRRAutility_inv, 

7 CRRAutility_invP, 

8 CRRAutilityP_X, 

9 CRRAutilityP_inv, 

10 CRRAutilityP_invP, 

11 CRRAutilityPP_X, 

12) 

13 

14CRRAutility = njit(CRRAutility_X, cache=True) 

15CRRAutilityP = njit(CRRAutilityP_X, cache=True) 

16CRRAutilityPP = njit(CRRAutilityPP_X, cache=True) 

17CRRAutilityP_inv = njit(CRRAutilityP_inv, cache=True) 

18CRRAutility_invP = njit(CRRAutility_invP, cache=True) 

19CRRAutility_inv = njit(CRRAutility_inv, cache=True) 

20CRRAutilityP_invP = njit(CRRAutilityP_invP, cache=True) 

21 

22 

23@njit(cache=True, error_model="numpy") 

24def _decay_extrap_coeffs( 

25 x_list, y_list, intercept_limit, slope_limit 

26): # pragma: no cover 

27 # Coefficients for decay extrapolation toward the line y = slope*x + intercept. 

28 slope_at_top = (y_list[-1] - y_list[-2]) / (x_list[-1] - x_list[-2]) 

29 level_diff = intercept_limit + slope_limit * x_list[-1] - y_list[-1] 

30 slope_diff = slope_limit - slope_at_top 

31 decay_extrap_A = level_diff 

32 decay_extrap_B = -slope_diff / level_diff 

33 return decay_extrap_A, decay_extrap_B 

34 

35 

36@njit(cache=True, error_model="numpy") 

37def _interp_linear(x0, x_list, y_list, lower_extrap): # pragma: no cover 

38 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1) 

39 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

40 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i] 

41 

42 if not lower_extrap: 

43 below_lower_bound = x0 < x_list[0] 

44 y0[below_lower_bound] = np.nan 

45 

46 return y0 

47 

48 

49@njit(cache=True, error_model="numpy") 

50def _apply_y_decay_extrap( 

51 x0, y0, x_list, intercept_limit, slope_limit, decay_extrap_A, decay_extrap_B 

52): # pragma: no cover 

53 above_upper_bound = x0 > x_list[-1] 

54 x_temp = x0[above_upper_bound] - x_list[-1] 

55 y0[above_upper_bound] = ( 

56 intercept_limit 

57 + slope_limit * x0[above_upper_bound] 

58 - decay_extrap_A * np.exp(-decay_extrap_B * x_temp) 

59 ) 

60 return above_upper_bound, x_temp 

61 

62 

63@njit(cache=True, error_model="numpy") 

64def _interp_decay( 

65 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap 

66): # pragma: no cover 

67 decay_extrap_A, decay_extrap_B = _decay_extrap_coeffs( 

68 x_list, y_list, intercept_limit, slope_limit 

69 ) 

70 y0 = _interp_linear(x0, x_list, y_list, lower_extrap) 

71 _apply_y_decay_extrap( 

72 x0, y0, x_list, intercept_limit, slope_limit, decay_extrap_A, decay_extrap_B 

73 ) 

74 return y0 

75 

76 

77@njit(cache=True, error_model="numpy") 

78def linear_interp_fast( 

79 x0, x_list, y_list, intercept_limit=None, slope_limit=None, lower_extrap=False 

80): # pragma: no cover 

81 if intercept_limit is None and slope_limit is None: 

82 return _interp_linear(x0, x_list, y_list, lower_extrap) 

83 else: 

84 return _interp_decay( 

85 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap 

86 ) 

87 

88 

89@njit(cache=True, error_model="numpy") 

90def _interp_linear_deriv(x0, x_list, y_list, lower_extrap): # pragma: no cover 

91 i = np.maximum(np.searchsorted(x_list[:-1], x0), 1) 

92 alpha = (x0 - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

93 y0 = (1.0 - alpha) * y_list[i - 1] + alpha * y_list[i] 

94 dydx = (y_list[i] - y_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

95 

96 if not lower_extrap: 

97 below_lower_bound = x0 < x_list[0] 

98 y0[below_lower_bound] = np.nan 

99 dydx[below_lower_bound] = np.nan 

100 

101 return y0, dydx 

102 

103 

104@njit(cache=True, error_model="numpy") 

105def _interp_decay_deriv( 

106 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap 

107): # pragma: no cover 

108 decay_extrap_A, decay_extrap_B = _decay_extrap_coeffs( 

109 x_list, y_list, intercept_limit, slope_limit 

110 ) 

111 y0, dydx = _interp_linear_deriv(x0, x_list, y_list, lower_extrap) 

112 above_upper_bound, x_temp = _apply_y_decay_extrap( 

113 x0, y0, x_list, intercept_limit, slope_limit, decay_extrap_A, decay_extrap_B 

114 ) 

115 dydx[above_upper_bound] = slope_limit + decay_extrap_B * decay_extrap_A * np.exp( 

116 -decay_extrap_B * x_temp 

117 ) 

118 return y0, dydx 

119 

120 

121@njit(cache=True, error_model="numpy") 

122def linear_interp_deriv_fast( 

123 x0, x_list, y_list, intercept_limit=None, slope_limit=None, lower_extrap=False 

124): # pragma: no cover 

125 if intercept_limit is None and slope_limit is None: 

126 return _interp_linear_deriv(x0, x_list, y_list, lower_extrap) 

127 else: 

128 return _interp_decay_deriv( 

129 x0, x_list, y_list, intercept_limit, slope_limit, lower_extrap 

130 ) 

131 

132 

133@njit(cache=True, error_model="numpy") 

134def _spline_decay( 

135 x_init, x_list, y_list, dydx_list, intercept_limit, slope_limit, lower_extrap 

136): # pragma: no cover 

137 n = x_list.size 

138 

139 coeffs = np.empty((n + 1, 4)) 

140 

141 # Define lower extrapolation as linear function (or just NaN) 

142 if lower_extrap: 

143 coeffs[0] = np.array([y_list[0], dydx_list[0], 0, 0]) 

144 else: 

145 coeffs[0] = np.array([np.nan, np.nan, np.nan, np.nan]) 

146 

147 # Calculate interpolation coefficients on segments mapped to [0,1] 

148 xdiff = np.diff(x_list) 

149 ydiff = np.diff(y_list) 

150 dydx0 = dydx_list[:-1] * xdiff 

151 dydx1 = dydx_list[1:] * xdiff 

152 coeffs[1:-1, 0] = y_list[:-1] 

153 coeffs[1:-1, 1] = dydx0 

154 coeffs[1:-1, 2] = 3 * ydiff - 2 * dydx0 - dydx1 

155 coeffs[1:-1, 3] = -2 * ydiff + dydx0 + dydx1 

156 

157 # Calculate extrapolation coefficients as a decay toward limiting function y = mx+b 

158 gap = slope_limit * x_list[n - 1] + intercept_limit - y_list[n - 1] 

159 slope = slope_limit - dydx_list[n - 1] 

160 if (gap != 0) and (slope <= 0): 

161 coeffs[-1] = np.array([intercept_limit, slope_limit, gap, slope / gap]) 

162 elif slope > 0: 

163 # fixing a problem when slope is positive 

164 coeffs[-1] = np.array([intercept_limit, slope_limit, 0, 0]) 

165 else: 

166 coeffs[-1] = np.array([intercept_limit, slope_limit, gap, 0]) 

167 

168 m = len(x_init) 

169 pos = np.searchsorted(x_list, x_init) 

170 y = np.zeros(m) 

171 dydx = np.zeros(m) 

172 

173 if m > 0: 

174 out_bot = pos == 0 

175 out_top = pos == n 

176 in_bnds = np.logical_not(np.logical_or(out_bot, out_top)) 

177 

178 # In-bounds evaluation 

179 i = pos[in_bnds] 

180 coeffs_in = coeffs[i, :] 

181 alpha_in = (x_init[in_bnds] - x_list[i - 1]) / (x_list[i] - x_list[i - 1]) 

182 y[in_bnds] = coeffs_in[:, 0] + alpha_in * ( 

183 coeffs_in[:, 1] + alpha_in * (coeffs_in[:, 2] + alpha_in * coeffs_in[:, 3]) 

184 ) 

185 dydx[in_bnds] = ( 

186 coeffs_in[:, 1] 

187 + alpha_in * (2 * coeffs_in[:, 2] + alpha_in * 3 * coeffs_in[:, 3]) 

188 ) / (x_list[i] - x_list[i - 1]) 

189 

190 # Out-of-bounds: bottom 

191 y[out_bot] = coeffs[0, 0] + coeffs[0, 1] * (x_init[out_bot] - x_list[0]) 

192 dydx[out_bot] = coeffs[0, 1] 

193 

194 # Out-of-bounds: top 

195 alpha_top = x_init[out_top] - x_list[n - 1] 

196 y[out_top] = ( 

197 coeffs[n, 0] 

198 + x_init[out_top] * coeffs[n, 1] 

199 - coeffs[n, 2] * np.exp(alpha_top * coeffs[n, 3]) 

200 ) 

201 dydx[out_top] = coeffs[n, 1] - coeffs[n, 2] * coeffs[n, 3] * np.exp( 

202 alpha_top * coeffs[n, 3] 

203 ) 

204 

205 return y, dydx 

206 

207 

208@njit(cache=True, error_model="numpy") 

209def cubic_interp_fast( 

210 x0, 

211 x_list, 

212 y_list, 

213 dydx_list, 

214 intercept_limit=None, 

215 slope_limit=None, 

216 lower_extrap=False, 

217): # pragma: no cover 

218 if intercept_limit is None and slope_limit is None: 

219 slope = dydx_list[-1] 

220 intercept = y_list[-1] - slope * x_list[-1] 

221 

222 return _spline_decay( 

223 x0, x_list, y_list, dydx_list, intercept, slope, lower_extrap 

224 ) 

225 else: 

226 return _spline_decay( 

227 x0, x_list, y_list, dydx_list, intercept_limit, slope_limit, lower_extrap 

228 )