Coverage for HARK/model.py: 97%
148 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 05:14 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 05:14 +0000
1"""
2Tools for crafting models.
3"""
5from dataclasses import dataclass, field, replace
6from copy import copy, deepcopy
7from HARK.distributions import (
8 Distribution,
9 DiscreteDistributionLabeled,
10 combine_indep_dstns,
11 expected,
12)
13from inspect import signature
14import numpy as np
15from HARK.parser import math_text_to_lambda
16from typing import Any, Callable, Mapping, List, Union
19class Aggregate:
20 """
21 Used to designate a shock as an aggregate shock.
22 If so designated, draws from the shock will be scalar rather
23 than array valued.
24 """
26 def __init__(self, dist: Distribution):
27 self.dist = dist
30class Control:
31 """
32 Used to designate a variabel that is a control variable.
34 Parameters
35 ----------
36 args : list of str
37 The labels of the variables that are in the information set of this control.
38 """
40 def __init__(self, args):
41 pass
44def discretized_shock_dstn(shocks, disc_params):
45 """
46 Discretizes a collection of independent shocks and combines
47 them into one DiscreteDistributionLabeled.
49 Shocks are discretized only if they have a corresponding
50 element of disc_params defined.
52 Parameters
53 -----------
54 shocks: dict of Distribution
55 A dictionary of Distributions, representing independent exogenous shocks.
57 disc_params: dict of dict
58 A dictionary of dictionaries with arguments to Distribution.discretize.
59 Keys of this dictionary should be shared with the shocks argument.
60 """
61 dshocks = {}
63 for shockn in shocks:
64 if shockn == "live": # hacky hack
65 pass
66 elif shockn in disc_params:
67 dshocks[shockn] = DiscreteDistributionLabeled.from_unlabeled(
68 shocks[shockn].discretize(**disc_params[shockn]), var_names=[shockn]
69 )
70 else:
71 # assume already discrete
72 dshocks[shockn] = DiscreteDistributionLabeled.from_unlabeled(
73 shocks[shockn], var_names=[shockn]
74 )
76 all_shock_dstn = combine_indep_dstns(*dshocks.values())
78 return all_shock_dstn
81def construct_shocks(shock_data, scope):
82 """
83 Returns a dictionary from shock labels to Distributions.
85 When the corresponding value in shock_data contains
86 a distribution constructor and input information,
87 any symbolic expressions used in the inputs are
88 evaluated in the provided scope.
90 Parameters
91 ------------
93 shock_data: Mapping(str, Distribution or tuple)
94 A mapping from variable names to Distribution objects,
95 representing exogenous shocks.
97 Optionally, the mapping can be to tuples of Distribution
98 constructors and dictionary of input arguments.
99 In this case, the dictionary can map argument names to
100 numbers, or to strings. The strings are parsed as
101 mathematical expressions and evaluated in the scope
102 of a calibration dictionary.
104 scope: dict(str, values)
105 Variables assigned to numerical values.
106 The scope in which expressions will be evaluated
107 """
108 sd = deepcopy(shock_data)
110 for v in sd:
111 if isinstance(sd[v], tuple):
112 dist_class = sd[v][0]
114 dist_args = sd[v][1] # should be a dictionary
116 for a in dist_args:
117 if isinstance(dist_args[a], str):
118 arg_lambda = math_text_to_lambda(dist_args[a])
119 arg_value = arg_lambda(
120 *[scope[var] for var in signature(arg_lambda).parameters]
121 )
123 dist_args[a] = arg_value
125 dist = dist_class(**dist_args)
127 sd[v] = dist
129 return sd
132def simulate_dynamics(
133 dynamics: Mapping[str, Union[Callable, Control]],
134 pre: Mapping[str, Any],
135 dr: Mapping[str, Callable],
136):
137 """
138 From the beginning-of-period state (pre), follow the dynamics,
139 including any decision rules, to compute the end-of-period state.
141 Parameters
142 ------------
144 dynamics: Mapping[str, Callable]
145 Maps variable names to functions from variables to values.
146 Can include Controls
147 ## TODO: Make collection of equations into a named type
150 pre : Mapping[str, Any]
151 Bound values for all variables that must be known before beginning the period's dynamics.
154 dr : Mapping[str, Callable]
155 Decision rules for all the Control variables in the dynamics.
156 """
157 vals = pre.copy()
159 for varn in dynamics:
160 # Using the fact that Python dictionaries are ordered
162 feq = dynamics[varn]
164 if isinstance(feq, Control):
165 # This tests if the decision rule is age varying.
166 # If it is, this will be a vector with the decision rule for each agent.
167 if isinstance(dr[varn], np.ndarray):
168 ## Now we have to loop through each agent, and apply the decision rule.
169 ## This is quite slow.
170 for i in range(dr[varn].size):
171 vals_i = {
172 var: (
173 vals[var][i]
174 if isinstance(vals[var], np.ndarray)
175 else vals[var]
176 )
177 for var in vals
178 }
179 vals[varn][i] = dr[varn][i](
180 *[vals_i[var] for var in signature(dr[varn][i]).parameters]
181 )
182 else:
183 vals[varn] = dr[varn](
184 *[vals[var] for var in signature(dr[varn]).parameters]
185 ) # TODO: test for signature match with Control
186 else:
187 vals[varn] = feq(*[vals[var] for var in signature(feq).parameters])
189 return vals
192class Block:
193 pass
196@dataclass
197class DBlock(Block):
198 """
199 Represents a 'block' of model behavior.
200 It prioritizes a representation of the dynamics of the block.
201 Control variables are designated by the appropriate dynamic rule.
203 Parameters
204 ----------
205 shocks: Mapping(str, Distribution or tuple)
206 A mapping from variable names to Distribution objects,
207 representing exogenous shocks.
209 Optionally, the mapping can be to tuples of Distribution
210 constructors and dictionary of input arguments.
211 In this case, the dictionary can map argument names to
212 numbers, or to strings. The strings are parsed as
213 mathematical expressions and evaluated in the scope
214 of a calibration dictionary.
216 dynamics: Mapping(str, str or callable)
217 A dictionary mapping variable names to mathematical expressions.
218 These expressions can be simple functions, in which case the
219 argument names should match the variable inputs.
220 Or these can be strings, which are parsed into functions.
222 """
224 name: str = ""
225 description: str = ""
226 shocks: dict = field(default_factory=dict)
227 dynamics: dict = field(default_factory=dict)
228 reward: dict = field(default_factory=dict)
230 def construct_shocks(self, calibration):
231 """
232 Constructs all shocks given calibration.
233 This method mutates the DBlock.
234 """
235 self.shocks = construct_shocks(self.shocks, calibration)
237 def discretize(self, disc_params):
238 """
239 Returns a new DBlock which is a copy of this one, but with shock discretized.
240 """
242 disc_shocks = {}
244 for shockn in self.shocks:
245 if shockn in disc_params:
246 disc_shocks[shockn] = self.shocks[shockn].discretize(
247 **disc_params[shockn]
248 )
249 else:
250 disc_shocks[shockn] = deepcopy(self.shocks[shockn])
252 # replace returns a modified copy
253 new_dblock = replace(self, shocks=disc_shocks)
255 return new_dblock
257 def __post_init__(self):
258 for v in self.dynamics:
259 if isinstance(self.dynamics[v], str):
260 self.dynamics[v] = math_text_to_lambda(self.dynamics[v])
262 for r in self.reward:
263 if isinstance(self.reward[r], str):
264 self.reward[r] = math_text_to_lambda(self.reward[r])
266 def get_shocks(self):
267 return self.shocks
269 def get_dynamics(self):
270 return self.dynamics
272 def get_vars(self):
273 return (
274 list(self.shocks.keys())
275 + list(self.dynamics.keys())
276 + list(self.reward.keys())
277 )
279 def transition(self, pre, dr):
280 """
281 Returns variable values given previous values and decision rule for all controls.
282 """
283 return simulate_dynamics(self.dynamics, pre, dr)
285 def calc_reward(self, vals):
286 """
287 Computes the reward for a given set of variable values
288 """
289 rvals = {}
291 for varn in self.reward:
292 feq = self.reward[varn]
293 rvals[varn] = feq(*[vals[var] for var in signature(feq).parameters])
295 return rvals
297 def get_state_rule_value_function_from_continuation(self, continuation):
298 """
299 Given a continuation value function, returns a state-rule value
300 function: the value for each state and decision rule.
301 This value includes both the reward for executing the rule
302 'this period', and the continuation value of the resulting states.
303 """
305 def state_rule_value_function(pre, dr):
306 vals = self.transition(pre, dr)
307 r = list(self.calc_reward(vals).values())[0] # a hack; to be improved
308 cv = continuation(
309 *[vals[var] for var in signature(continuation).parameters]
310 )
312 return r + cv
314 return state_rule_value_function
316 def get_decision_value_function(self, dr, continuation):
317 """
318 Given a decision rule and a continuation value function,
319 return a function for the value at the decision step/tac,
320 after the shock have been realized.
322 ## TODO: it would be better to systematize these value functions per block
323 ## better, then construct them with 'partial' methods
324 """
325 srvf = self.get_state_rule_value_function_from_continuation(continuation)
327 def decision_value_function(shpre):
328 return srvf(shpre, dr)
330 return decision_value_function
332 def get_arrival_value_function(self, disc_params, dr, continuation):
333 """
334 Returns an arrival value function, which is the value of the states
335 upon arrival into the block.
337 This involves taking an expectation over shocks (which must
338 first be discretized), a decision rule, and a continuation
339 value function.)
340 """
342 def arrival_value_function(arvs):
343 dvf = self.get_decision_value_function(dr, continuation)
345 ds = discretized_shock_dstn(self.shocks, disc_params)
347 arvs_args = [arvs[avn] for avn in arvs]
349 def mod_dvf(shock_value_array):
350 shockvs = {
351 shn: shock_value_array[shn]
352 for i, shn in enumerate(list(ds.variables.keys()))
353 }
355 dvf_args = {}
356 dvf_args.update(arvs)
357 dvf_args.update(shockvs)
359 return dvf(dvf_args)
361 return expected(func=mod_dvf, dist=ds)
363 return arrival_value_function
366@dataclass
367class RBlock(Block):
368 """
369 A recursive block.
371 Parameters
372 ----------
373 ...
374 """
376 name: str = ""
377 description: str = ""
378 blocks: List[Block] = field(default_factory=list)
380 def construct_shocks(self, calibration):
381 """
382 Construct all shocks given a calibration dictionary.
383 """
384 for b in self.blocks:
385 b.construct_shocks(calibration)
387 def discretize(self, disc_params):
388 """
389 Recursively discretizes all the blocks.
390 It replaces any DBlocks with new blocks with discretized shocks.
391 """
392 cbs = copy(self.blocks)
394 for i, b in list(enumerate(cbs)):
395 if isinstance(b, DBlock):
396 nb = b.discretize(disc_params)
397 cbs[i] = nb
398 elif isinstance(b, RBlock):
399 b.discretize(disc_params)
401 # returns a copy of the RBlock with the blocks replaced
402 return replace(self, blocks=cbs)
404 def get_shocks(self):
405 ### TODO: Bug in here is causing AttributeError: 'set' object has no attribute 'draw'
407 super_shocks = {} # uses set to avoid duplicates
409 for b in self.blocks:
410 for k, v in b.get_shocks().items(): # use d.iteritems() in python 2
411 super_shocks[k] = v
413 return super_shocks
415 def get_controls(self):
416 dyn = self.get_dynamics()
418 return [varn for varn in dyn if isinstance(dyn[varn], Control)]
420 def get_dynamics(self):
421 super_dyn = {} # uses set to avoid duplicates
423 for b in self.blocks:
424 for k, v in b.get_dynamics().items(): # use d.iteritems() in python 2
425 super_dyn[k] = v
427 return super_dyn
429 def get_vars(self):
430 return list(self.get_shocks().keys()) + list(self.get_dynamics().keys())