Coverage for HARK/econforgeinterp.py: 89%

85 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-02 05:14 +0000

1import numpy as np 

2from interpolation.splines import CGrid, eval_linear, eval_spline 

3from interpolation.splines import extrap_options as xto 

4 

5from HARK.metric import MetricObject 

6 

7extrap_opts = { 

8 "linear": xto.LINEAR, 

9 "nearest": xto.NEAREST, 

10 "constant": xto.CONSTANT, 

11} 

12 

13 

14class LinearFast(MetricObject): 

15 """ 

16 A class that constructs and holds all the necessary elements to 

17 call a multilinear interpolator from econforge.interpolator in 

18 a way that resembles the basic interpolators in HARK.interpolation. 

19 """ 

20 

21 distance_criteria = ["f_val", "grid_list"] 

22 

23 def __init__(self, f_val, grids, extrap_mode="linear", prebuilt_grid=None): 

24 """ 

25 f_val: numpy.array 

26 An array containing the values of the function at the grid points. 

27 It's i-th dimension must be of the same lenght as the i-th grid. 

28 f_val[i,j,k] must be f(grids[0][i], grids[1][j], grids[2][k]). 

29 grids: [numpy.array] 

30 One-dimensional list of numpy arrays. It's i-th entry must be the grid 

31 to be used for the i-th independent variable. 

32 extrap_mode: one of 'linear', 'nearest', or 'constant' 

33 Determines how to extrapolate, using either nearest point, multilinear, or 

34 constant extrapolation. The default is multilinear. 

35 prebuilt_grid: CGrid, optional 

36 A prebuilt CGrid object to be used as the grid. If None, a new one 

37 will be created using the grids provided. By default None. 

38 """ 

39 self.dim = len(grids) 

40 self.f_val = f_val 

41 self.grid_list = grids 

42 if prebuilt_grid is None: 

43 self.Grid = CGrid(*grids) 

44 else: 

45 self.Grid = prebuilt_grid 

46 

47 # Set up extrapolation options 

48 self.extrap_mode = extrap_mode 

49 try: 

50 self.extrap_options = extrap_opts[self.extrap_mode] 

51 except KeyError: 

52 raise KeyError( 

53 'extrap_mode must be one of "linear", "nearest", or "costant"' 

54 ) 

55 

56 def __call__(self, *args): 

57 """ 

58 Calls the interpolator. 

59 

60 args: [numpy.array] 

61 List of arrays. The i-th entry contains the i-th coordinate 

62 of all the points to be evaluated. All entries must have the 

63 same shape. 

64 """ 

65 array_args = [np.asarray(x) for x in args] 

66 

67 # Call the econforge function 

68 f = eval_linear( 

69 self.Grid, 

70 self.f_val, 

71 np.column_stack([x.flatten() for x in array_args]), 

72 self.extrap_options, 

73 ) 

74 

75 # Reshape the output to the shape of inputs 

76 return np.reshape(f, array_args[0].shape) 

77 

78 def _derivs(self, deriv_tuple, *args): 

79 """ 

80 Evaluates derivatives of the interpolator. 

81 

82 Parameters 

83 ---------- 

84 deriv_tuple : tuple of tuples of int 

85 Indicates what are the derivatives to be computed. 

86 It follows econforge's notation, where a derivative 

87 to be calculated is a tuple of length equal to the 

88 number of dimensions of the interpolator and entries 

89 in that tuple represent the order of the derivative. 

90 E.g. to calculate f(x,y) and df/dy(x,y) use 

91 deriv_tuple = ((0,0),(0,1)) 

92 

93 args: [numpy.array] 

94 List of arrays. The i-th entry contains the i-th coordinate 

95 of all the points to be evaluated. All entries must have the 

96 same shape. 

97 

98 Returns 

99 ------- 

100 [numpy.array] 

101 List of the derivatives that were requested in the same order 

102 as deriv_tuple. Each element has the shape of items in args. 

103 """ 

104 

105 # Format arguments 

106 array_args = [np.asarray(x) for x in args] 

107 

108 # Find derivatives with respect to every dimension 

109 derivs = eval_spline( 

110 self.Grid, 

111 self.f_val, 

112 np.column_stack([x.flatten() for x in array_args]), 

113 out=None, 

114 k=1, 

115 diff=str(deriv_tuple), 

116 extrap_mode=self.extrap_mode, 

117 ) 

118 

119 # Reshape 

120 derivs = [ 

121 derivs[:, j].reshape(args[0].shape) for j, tup in enumerate(deriv_tuple) 

122 ] 

123 

124 return derivs 

125 

126 def gradient(self, *args): 

127 """ 

128 Evaluates gradient of the interpolator. 

129 

130 Parameters 

131 ---------- 

132 args: [numpy.array] 

133 List of arrays. The i-th entry contains the i-th coordinate 

134 of all the points to be evaluated. All entries must have the 

135 same shape. 

136 

137 Returns 

138 ------- 

139 [numpy.array] 

140 List of the derivatives of the function with respect to each 

141 input, evaluated at the given points. E.g. if the interpolator 

142 represents 3D function f, f.gradient(x,y,z) will return 

143 [df/dx(x,y,z), df/dy(x,y,z), df/dz(x,y,z)]. Each element has the 

144 shape of items in args. 

145 """ 

146 # Form a tuple that indicates which derivatives to get 

147 # in the way eval_linear expects 

148 deriv_tup = tuple( 

149 tuple(1 if j == i else 0 for j in range(self.dim)) for i in range(self.dim) 

150 ) 

151 

152 return self._derivs(deriv_tup, *args) 

153 

154 def _eval_and_grad(self, *args): 

155 """ 

156 Evaluates interpolator and its gradient. 

157 

158 Parameters 

159 ---------- 

160 args: [numpy.array] 

161 List of arrays. The i-th entry contains the i-th coordinate 

162 of all the points to be evaluated. All entries must have the 

163 same shape. 

164 

165 Returns 

166 ------- 

167 numpy.array 

168 Value of the interpolator at given arguments. 

169 [numpy.array] 

170 List of the derivatives of the function with respect to each 

171 input, evaluated at the given points. E.g. if the interpolator 

172 represents 3D function f, the list will be 

173 [df/dx(x,y,z), df/dy(x,y,z), df/dz(x,y,z)]. Each element has the 

174 shape of items in args. 

175 """ 

176 # (0,0,...,0) to get the function evaluation 

177 eval_tup = tuple([tuple(0 for i in range(self.dim))]) 

178 

179 # Tuple with indicators for all the derivatives 

180 deriv_tup = tuple( 

181 tuple(1 if j == i else 0 for j in range(self.dim)) for i in range(self.dim) 

182 ) 

183 

184 results = self._derivs(eval_tup + deriv_tup, *args) 

185 

186 return (results[0], results[1:]) 

187 

188 

189class DecayInterp(MetricObject): 

190 """ 

191 A class of interpolators that use a limiting function 

192 for extrapolation. 

193 

194 See HARK/examples/Interpolation/DecayInterp.ipynb for examples of 

195 how to use this class. 

196 

197 """ 

198 

199 distance_criteria = ["interp"] 

200 

201 def __init__( 

202 self, 

203 interp, 

204 limit_fun, 

205 limit_grad=None, 

206 extrap_method="decay_prop", 

207 ): 

208 """ 

209 

210 Parameters 

211 ---------- 

212 interp : N-dim LinearFast object 

213 Linear interpolator 

214 limit_fun : N-dim function 

215 Limiting function to be used when extrapolating 

216 limit_grad : function, optional 

217 Function that returns the gradient of the limiting function. Must 

218 follow the convention of LinearFast's gradients, where the gradient 

219 takes the form of a list of arrays. By default None 

220 extrap_method : str, optional 

221 Method to use for calculating extrapolated values. Must be one of 

222 "decay_prop", "decay_hark", "paste". By default "decay_prop" 

223 See HARK/examples/interpolation/DecayInterp.ipynb, for detailed 

224 explanations of each method. 

225 """ 

226 self.interp = interp 

227 self.limit_fun = limit_fun 

228 

229 self.limit_grad = limit_grad 

230 

231 self.grid_list = self.interp.grid_list 

232 

233 self.upper_limits = np.array([x[-1] for x in self.grid_list]) 

234 self.dim = len(self.grid_list) 

235 

236 self.extrap_methods = { 

237 "decay_prop": self.extrap_decay_prop, 

238 "decay_hark": self.extrap_decay_hark, 

239 "paste": self.extrap_paste, 

240 } 

241 

242 try: 

243 self.extrap_fun = self.extrap_methods[extrap_method] 

244 except KeyError: 

245 raise KeyError( 

246 'extrap_method must be one of "decay_prop", "decay_hark", or "paste"' 

247 ) 

248 

249 def __call__(self, *args): 

250 """ 

251 Calls the interpolator with decay extrapolation. 

252 

253 args: [numpy.array] 

254 List of arrays. The i-th entry contains the i-th coordinate 

255 of all the points to be evaluated. All entries must have the 

256 same shape. 

257 """ 

258 

259 # Save the shape of the arguments 

260 argshape = np.asarray(args[0]).shape 

261 # Save in a matrix: rows are points, columns are dimensions 

262 col_args = np.column_stack([np.asarray(x).flatten() for x in args]) 

263 

264 # Get indices, points, and closest in-grid point to points that 

265 # require extrapolation. 

266 upper_ex_inds = np.any(col_args > self.upper_limits[None, :], axis=1) 

267 upper_ex_points = col_args[upper_ex_inds,] 

268 upper_ex_nearest = np.minimum(upper_ex_points, self.upper_limits[None, :]) 

269 

270 # Find function evaluations with regular extrapolation 

271 f = self.interp(*[col_args[:, i] for i in range(self.dim)]) 

272 

273 # Find extrapolated values with chosen method 

274 f[upper_ex_inds] = self.extrap_fun(upper_ex_points, upper_ex_nearest) 

275 

276 return np.reshape(f, argshape) 

277 

278 def extrap_decay_prop(self, x, closest_x): 

279 """ 

280 "decay_prop" extrapolation method. Combines the interpolator's 

281 default extrapolation and the limiting function with weights 

282 that depend on how far from the grid the values are. 

283 

284 Parameters 

285 ---------- 

286 x : inputs that require extrapolation. 

287 closest_x : for each of the inputs that require extrapolation, contains 

288 the closest point that falls inside the grid. 

289 """ 

290 # Evaluate base interpolator at x 

291 f_val_x = self.interp(*[x[:, i] for i in range(self.dim)]) 

292 # Evaluate limiting function at x 

293 g_val_x = self.limit_fun(*[x[:, i] for i in range(self.dim)]) 

294 

295 # Find distance between points and closest in-grid point. 

296 decay_weights = np.abs(1 / self.upper_limits) # Rescale as proportions 

297 dist = np.dot(np.abs(x - closest_x), decay_weights) 

298 weight = np.exp(-1 * dist) 

299 

300 return weight * f_val_x + (1 - weight) * g_val_x 

301 

302 def extrap_decay_hark(self, x, closest_x): 

303 """ 

304 "decay_hark" extrapolation method. Takes into account the rate at 

305 which the interpolator and limiting function are approaching at the 

306 edge of the grid for combining them. 

307 

308 Parameters 

309 ---------- 

310 x : inputs that require extrapolation. 

311 closest_x : for each of the inputs that require extrapolation, contains 

312 the closest point that falls inside the grid. 

313 """ 

314 

315 # Evaluate limiting function at x 

316 g_val_x = self.limit_fun(*[x[:, i] for i in range(self.dim)]) 

317 

318 # Get gradients and values at the closest in-grid point 

319 closest_x_arglist = [closest_x[:, i][..., None] for i in range(self.dim)] 

320 

321 # Interpolator 

322 f_val, f_grad = self.interp._eval_and_grad(*closest_x_arglist) 

323 f_grad = np.hstack(f_grad) 

324 # Limit 

325 g_val = self.limit_fun(*closest_x_arglist) 

326 g_grad = self.limit_grad(*closest_x_arglist) 

327 g_grad = np.hstack(g_grad) 

328 

329 # Construct weights 

330 A = g_val - f_val 

331 B = np.abs(np.divide(1, A) * (g_grad - f_grad)) 

332 # Distance weighted by B 

333 w_dist = np.sum(B * (x - closest_x), axis=1, keepdims=True) 

334 # If f and g start out together at the edge of the grid, treat 

335 # the point as infinitely far away so that the limiting value is used. 

336 w_dist[A.flatten() == 0.0, ...] = np.inf 

337 

338 # Combine the limit value at x and the values at the 

339 # edge of the grid 

340 val = g_val_x[..., None] - A * np.exp(-1.0 * w_dist) 

341 

342 return val.flatten() 

343 

344 def extrap_paste(self, x, closest_x): 

345 """ 

346 "paste" extrapolation method. Uses the limiting function 

347 for extrapolation, but with a vertical shift that "pastes" it 

348 to the interpolator at the edge of the grid. 

349 

350 Parameters 

351 ---------- 

352 x : inputs that require extrapolation. 

353 closest_x : for each of the inputs that require extrapolation, contains 

354 the closest point that falls inside the grid. 

355 """ 

356 # Evaluate base interpolator and limit at closest x 

357 f_val_closest = self.interp(*[closest_x[:, i] for i in range(self.dim)]) 

358 g_val_closest = self.limit_fun(*[closest_x[:, i] for i in range(self.dim)]) 

359 

360 # Evaluate limit function at x 

361 g_val_x = self.limit_fun(*[x[:, i] for i in range(self.dim)]) 

362 

363 return f_val_closest + (g_val_x - g_val_closest)