Coverage for HARK/econforgeinterp.py: 89%
85 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 05:14 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 05:14 +0000
1import numpy as np
2from interpolation.splines import CGrid, eval_linear, eval_spline
3from interpolation.splines import extrap_options as xto
5from HARK.metric import MetricObject
7extrap_opts = {
8 "linear": xto.LINEAR,
9 "nearest": xto.NEAREST,
10 "constant": xto.CONSTANT,
11}
14class LinearFast(MetricObject):
15 """
16 A class that constructs and holds all the necessary elements to
17 call a multilinear interpolator from econforge.interpolator in
18 a way that resembles the basic interpolators in HARK.interpolation.
19 """
21 distance_criteria = ["f_val", "grid_list"]
23 def __init__(self, f_val, grids, extrap_mode="linear", prebuilt_grid=None):
24 """
25 f_val: numpy.array
26 An array containing the values of the function at the grid points.
27 It's i-th dimension must be of the same lenght as the i-th grid.
28 f_val[i,j,k] must be f(grids[0][i], grids[1][j], grids[2][k]).
29 grids: [numpy.array]
30 One-dimensional list of numpy arrays. It's i-th entry must be the grid
31 to be used for the i-th independent variable.
32 extrap_mode: one of 'linear', 'nearest', or 'constant'
33 Determines how to extrapolate, using either nearest point, multilinear, or
34 constant extrapolation. The default is multilinear.
35 prebuilt_grid: CGrid, optional
36 A prebuilt CGrid object to be used as the grid. If None, a new one
37 will be created using the grids provided. By default None.
38 """
39 self.dim = len(grids)
40 self.f_val = f_val
41 self.grid_list = grids
42 if prebuilt_grid is None:
43 self.Grid = CGrid(*grids)
44 else:
45 self.Grid = prebuilt_grid
47 # Set up extrapolation options
48 self.extrap_mode = extrap_mode
49 try:
50 self.extrap_options = extrap_opts[self.extrap_mode]
51 except KeyError:
52 raise KeyError(
53 'extrap_mode must be one of "linear", "nearest", or "costant"'
54 )
56 def __call__(self, *args):
57 """
58 Calls the interpolator.
60 args: [numpy.array]
61 List of arrays. The i-th entry contains the i-th coordinate
62 of all the points to be evaluated. All entries must have the
63 same shape.
64 """
65 array_args = [np.asarray(x) for x in args]
67 # Call the econforge function
68 f = eval_linear(
69 self.Grid,
70 self.f_val,
71 np.column_stack([x.flatten() for x in array_args]),
72 self.extrap_options,
73 )
75 # Reshape the output to the shape of inputs
76 return np.reshape(f, array_args[0].shape)
78 def _derivs(self, deriv_tuple, *args):
79 """
80 Evaluates derivatives of the interpolator.
82 Parameters
83 ----------
84 deriv_tuple : tuple of tuples of int
85 Indicates what are the derivatives to be computed.
86 It follows econforge's notation, where a derivative
87 to be calculated is a tuple of length equal to the
88 number of dimensions of the interpolator and entries
89 in that tuple represent the order of the derivative.
90 E.g. to calculate f(x,y) and df/dy(x,y) use
91 deriv_tuple = ((0,0),(0,1))
93 args: [numpy.array]
94 List of arrays. The i-th entry contains the i-th coordinate
95 of all the points to be evaluated. All entries must have the
96 same shape.
98 Returns
99 -------
100 [numpy.array]
101 List of the derivatives that were requested in the same order
102 as deriv_tuple. Each element has the shape of items in args.
103 """
105 # Format arguments
106 array_args = [np.asarray(x) for x in args]
108 # Find derivatives with respect to every dimension
109 derivs = eval_spline(
110 self.Grid,
111 self.f_val,
112 np.column_stack([x.flatten() for x in array_args]),
113 out=None,
114 k=1,
115 diff=str(deriv_tuple),
116 extrap_mode=self.extrap_mode,
117 )
119 # Reshape
120 derivs = [
121 derivs[:, j].reshape(args[0].shape) for j, tup in enumerate(deriv_tuple)
122 ]
124 return derivs
126 def gradient(self, *args):
127 """
128 Evaluates gradient of the interpolator.
130 Parameters
131 ----------
132 args: [numpy.array]
133 List of arrays. The i-th entry contains the i-th coordinate
134 of all the points to be evaluated. All entries must have the
135 same shape.
137 Returns
138 -------
139 [numpy.array]
140 List of the derivatives of the function with respect to each
141 input, evaluated at the given points. E.g. if the interpolator
142 represents 3D function f, f.gradient(x,y,z) will return
143 [df/dx(x,y,z), df/dy(x,y,z), df/dz(x,y,z)]. Each element has the
144 shape of items in args.
145 """
146 # Form a tuple that indicates which derivatives to get
147 # in the way eval_linear expects
148 deriv_tup = tuple(
149 tuple(1 if j == i else 0 for j in range(self.dim)) for i in range(self.dim)
150 )
152 return self._derivs(deriv_tup, *args)
154 def _eval_and_grad(self, *args):
155 """
156 Evaluates interpolator and its gradient.
158 Parameters
159 ----------
160 args: [numpy.array]
161 List of arrays. The i-th entry contains the i-th coordinate
162 of all the points to be evaluated. All entries must have the
163 same shape.
165 Returns
166 -------
167 numpy.array
168 Value of the interpolator at given arguments.
169 [numpy.array]
170 List of the derivatives of the function with respect to each
171 input, evaluated at the given points. E.g. if the interpolator
172 represents 3D function f, the list will be
173 [df/dx(x,y,z), df/dy(x,y,z), df/dz(x,y,z)]. Each element has the
174 shape of items in args.
175 """
176 # (0,0,...,0) to get the function evaluation
177 eval_tup = tuple([tuple(0 for i in range(self.dim))])
179 # Tuple with indicators for all the derivatives
180 deriv_tup = tuple(
181 tuple(1 if j == i else 0 for j in range(self.dim)) for i in range(self.dim)
182 )
184 results = self._derivs(eval_tup + deriv_tup, *args)
186 return (results[0], results[1:])
189class DecayInterp(MetricObject):
190 """
191 A class of interpolators that use a limiting function
192 for extrapolation.
194 See HARK/examples/Interpolation/DecayInterp.ipynb for examples of
195 how to use this class.
197 """
199 distance_criteria = ["interp"]
201 def __init__(
202 self,
203 interp,
204 limit_fun,
205 limit_grad=None,
206 extrap_method="decay_prop",
207 ):
208 """
210 Parameters
211 ----------
212 interp : N-dim LinearFast object
213 Linear interpolator
214 limit_fun : N-dim function
215 Limiting function to be used when extrapolating
216 limit_grad : function, optional
217 Function that returns the gradient of the limiting function. Must
218 follow the convention of LinearFast's gradients, where the gradient
219 takes the form of a list of arrays. By default None
220 extrap_method : str, optional
221 Method to use for calculating extrapolated values. Must be one of
222 "decay_prop", "decay_hark", "paste". By default "decay_prop"
223 See HARK/examples/interpolation/DecayInterp.ipynb, for detailed
224 explanations of each method.
225 """
226 self.interp = interp
227 self.limit_fun = limit_fun
229 self.limit_grad = limit_grad
231 self.grid_list = self.interp.grid_list
233 self.upper_limits = np.array([x[-1] for x in self.grid_list])
234 self.dim = len(self.grid_list)
236 self.extrap_methods = {
237 "decay_prop": self.extrap_decay_prop,
238 "decay_hark": self.extrap_decay_hark,
239 "paste": self.extrap_paste,
240 }
242 try:
243 self.extrap_fun = self.extrap_methods[extrap_method]
244 except KeyError:
245 raise KeyError(
246 'extrap_method must be one of "decay_prop", "decay_hark", or "paste"'
247 )
249 def __call__(self, *args):
250 """
251 Calls the interpolator with decay extrapolation.
253 args: [numpy.array]
254 List of arrays. The i-th entry contains the i-th coordinate
255 of all the points to be evaluated. All entries must have the
256 same shape.
257 """
259 # Save the shape of the arguments
260 argshape = np.asarray(args[0]).shape
261 # Save in a matrix: rows are points, columns are dimensions
262 col_args = np.column_stack([np.asarray(x).flatten() for x in args])
264 # Get indices, points, and closest in-grid point to points that
265 # require extrapolation.
266 upper_ex_inds = np.any(col_args > self.upper_limits[None, :], axis=1)
267 upper_ex_points = col_args[upper_ex_inds,]
268 upper_ex_nearest = np.minimum(upper_ex_points, self.upper_limits[None, :])
270 # Find function evaluations with regular extrapolation
271 f = self.interp(*[col_args[:, i] for i in range(self.dim)])
273 # Find extrapolated values with chosen method
274 f[upper_ex_inds] = self.extrap_fun(upper_ex_points, upper_ex_nearest)
276 return np.reshape(f, argshape)
278 def extrap_decay_prop(self, x, closest_x):
279 """
280 "decay_prop" extrapolation method. Combines the interpolator's
281 default extrapolation and the limiting function with weights
282 that depend on how far from the grid the values are.
284 Parameters
285 ----------
286 x : inputs that require extrapolation.
287 closest_x : for each of the inputs that require extrapolation, contains
288 the closest point that falls inside the grid.
289 """
290 # Evaluate base interpolator at x
291 f_val_x = self.interp(*[x[:, i] for i in range(self.dim)])
292 # Evaluate limiting function at x
293 g_val_x = self.limit_fun(*[x[:, i] for i in range(self.dim)])
295 # Find distance between points and closest in-grid point.
296 decay_weights = np.abs(1 / self.upper_limits) # Rescale as proportions
297 dist = np.dot(np.abs(x - closest_x), decay_weights)
298 weight = np.exp(-1 * dist)
300 return weight * f_val_x + (1 - weight) * g_val_x
302 def extrap_decay_hark(self, x, closest_x):
303 """
304 "decay_hark" extrapolation method. Takes into account the rate at
305 which the interpolator and limiting function are approaching at the
306 edge of the grid for combining them.
308 Parameters
309 ----------
310 x : inputs that require extrapolation.
311 closest_x : for each of the inputs that require extrapolation, contains
312 the closest point that falls inside the grid.
313 """
315 # Evaluate limiting function at x
316 g_val_x = self.limit_fun(*[x[:, i] for i in range(self.dim)])
318 # Get gradients and values at the closest in-grid point
319 closest_x_arglist = [closest_x[:, i][..., None] for i in range(self.dim)]
321 # Interpolator
322 f_val, f_grad = self.interp._eval_and_grad(*closest_x_arglist)
323 f_grad = np.hstack(f_grad)
324 # Limit
325 g_val = self.limit_fun(*closest_x_arglist)
326 g_grad = self.limit_grad(*closest_x_arglist)
327 g_grad = np.hstack(g_grad)
329 # Construct weights
330 A = g_val - f_val
331 B = np.abs(np.divide(1, A) * (g_grad - f_grad))
332 # Distance weighted by B
333 w_dist = np.sum(B * (x - closest_x), axis=1, keepdims=True)
334 # If f and g start out together at the edge of the grid, treat
335 # the point as infinitely far away so that the limiting value is used.
336 w_dist[A.flatten() == 0.0, ...] = np.inf
338 # Combine the limit value at x and the values at the
339 # edge of the grid
340 val = g_val_x[..., None] - A * np.exp(-1.0 * w_dist)
342 return val.flatten()
344 def extrap_paste(self, x, closest_x):
345 """
346 "paste" extrapolation method. Uses the limiting function
347 for extrapolation, but with a vertical shift that "pastes" it
348 to the interpolator at the edge of the grid.
350 Parameters
351 ----------
352 x : inputs that require extrapolation.
353 closest_x : for each of the inputs that require extrapolation, contains
354 the closest point that falls inside the grid.
355 """
356 # Evaluate base interpolator and limit at closest x
357 f_val_closest = self.interp(*[closest_x[:, i] for i in range(self.dim)])
358 g_val_closest = self.limit_fun(*[closest_x[:, i] for i in range(self.dim)])
360 # Evaluate limit function at x
361 g_val_x = self.limit_fun(*[x[:, i] for i in range(self.dim)])
363 return f_val_closest + (g_val_x - g_val_closest)