Coverage for HARK/model.py: 97%

148 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-02 05:14 +0000

1""" 

2Tools for crafting models. 

3""" 

4 

5from dataclasses import dataclass, field, replace 

6from copy import copy, deepcopy 

7from HARK.distributions import ( 

8 Distribution, 

9 DiscreteDistributionLabeled, 

10 combine_indep_dstns, 

11 expected, 

12) 

13from inspect import signature 

14import numpy as np 

15from HARK.parser import math_text_to_lambda 

16from typing import Any, Callable, Mapping, List, Union 

17 

18 

19class Aggregate: 

20 """ 

21 Used to designate a shock as an aggregate shock. 

22 If so designated, draws from the shock will be scalar rather 

23 than array valued. 

24 """ 

25 

26 def __init__(self, dist: Distribution): 

27 self.dist = dist 

28 

29 

30class Control: 

31 """ 

32 Used to designate a variabel that is a control variable. 

33 

34 Parameters 

35 ---------- 

36 args : list of str 

37 The labels of the variables that are in the information set of this control. 

38 """ 

39 

40 def __init__(self, args): 

41 pass 

42 

43 

44def discretized_shock_dstn(shocks, disc_params): 

45 """ 

46 Discretizes a collection of independent shocks and combines 

47 them into one DiscreteDistributionLabeled. 

48 

49 Shocks are discretized only if they have a corresponding 

50 element of disc_params defined. 

51 

52 Parameters 

53 ----------- 

54 shocks: dict of Distribution 

55 A dictionary of Distributions, representing independent exogenous shocks. 

56 

57 disc_params: dict of dict 

58 A dictionary of dictionaries with arguments to Distribution.discretize. 

59 Keys of this dictionary should be shared with the shocks argument. 

60 """ 

61 dshocks = {} 

62 

63 for shockn in shocks: 

64 if shockn == "live": # hacky hack 

65 pass 

66 elif shockn in disc_params: 

67 dshocks[shockn] = DiscreteDistributionLabeled.from_unlabeled( 

68 shocks[shockn].discretize(**disc_params[shockn]), var_names=[shockn] 

69 ) 

70 else: 

71 # assume already discrete 

72 dshocks[shockn] = DiscreteDistributionLabeled.from_unlabeled( 

73 shocks[shockn], var_names=[shockn] 

74 ) 

75 

76 all_shock_dstn = combine_indep_dstns(*dshocks.values()) 

77 

78 return all_shock_dstn 

79 

80 

81def construct_shocks(shock_data, scope): 

82 """ 

83 Returns a dictionary from shock labels to Distributions. 

84 

85 When the corresponding value in shock_data contains 

86 a distribution constructor and input information, 

87 any symbolic expressions used in the inputs are 

88 evaluated in the provided scope. 

89 

90 Parameters 

91 ------------ 

92 

93 shock_data: Mapping(str, Distribution or tuple) 

94 A mapping from variable names to Distribution objects, 

95 representing exogenous shocks. 

96 

97 Optionally, the mapping can be to tuples of Distribution 

98 constructors and dictionary of input arguments. 

99 In this case, the dictionary can map argument names to 

100 numbers, or to strings. The strings are parsed as 

101 mathematical expressions and evaluated in the scope 

102 of a calibration dictionary. 

103 

104 scope: dict(str, values) 

105 Variables assigned to numerical values. 

106 The scope in which expressions will be evaluated 

107 """ 

108 sd = deepcopy(shock_data) 

109 

110 for v in sd: 

111 if isinstance(sd[v], tuple): 

112 dist_class = sd[v][0] 

113 

114 dist_args = sd[v][1] # should be a dictionary 

115 

116 for a in dist_args: 

117 if isinstance(dist_args[a], str): 

118 arg_lambda = math_text_to_lambda(dist_args[a]) 

119 arg_value = arg_lambda( 

120 *[scope[var] for var in signature(arg_lambda).parameters] 

121 ) 

122 

123 dist_args[a] = arg_value 

124 

125 dist = dist_class(**dist_args) 

126 

127 sd[v] = dist 

128 

129 return sd 

130 

131 

132def simulate_dynamics( 

133 dynamics: Mapping[str, Union[Callable, Control]], 

134 pre: Mapping[str, Any], 

135 dr: Mapping[str, Callable], 

136): 

137 """ 

138 From the beginning-of-period state (pre), follow the dynamics, 

139 including any decision rules, to compute the end-of-period state. 

140 

141 Parameters 

142 ------------ 

143 

144 dynamics: Mapping[str, Callable] 

145 Maps variable names to functions from variables to values. 

146 Can include Controls 

147 ## TODO: Make collection of equations into a named type 

148 

149 

150 pre : Mapping[str, Any] 

151 Bound values for all variables that must be known before beginning the period's dynamics. 

152 

153 

154 dr : Mapping[str, Callable] 

155 Decision rules for all the Control variables in the dynamics. 

156 """ 

157 vals = pre.copy() 

158 

159 for varn in dynamics: 

160 # Using the fact that Python dictionaries are ordered 

161 

162 feq = dynamics[varn] 

163 

164 if isinstance(feq, Control): 

165 # This tests if the decision rule is age varying. 

166 # If it is, this will be a vector with the decision rule for each agent. 

167 if isinstance(dr[varn], np.ndarray): 

168 ## Now we have to loop through each agent, and apply the decision rule. 

169 ## This is quite slow. 

170 for i in range(dr[varn].size): 

171 vals_i = { 

172 var: ( 

173 vals[var][i] 

174 if isinstance(vals[var], np.ndarray) 

175 else vals[var] 

176 ) 

177 for var in vals 

178 } 

179 vals[varn][i] = dr[varn][i]( 

180 *[vals_i[var] for var in signature(dr[varn][i]).parameters] 

181 ) 

182 else: 

183 vals[varn] = dr[varn]( 

184 *[vals[var] for var in signature(dr[varn]).parameters] 

185 ) # TODO: test for signature match with Control 

186 else: 

187 vals[varn] = feq(*[vals[var] for var in signature(feq).parameters]) 

188 

189 return vals 

190 

191 

192class Block: 

193 pass 

194 

195 

196@dataclass 

197class DBlock(Block): 

198 """ 

199 Represents a 'block' of model behavior. 

200 It prioritizes a representation of the dynamics of the block. 

201 Control variables are designated by the appropriate dynamic rule. 

202 

203 Parameters 

204 ---------- 

205 shocks: Mapping(str, Distribution or tuple) 

206 A mapping from variable names to Distribution objects, 

207 representing exogenous shocks. 

208 

209 Optionally, the mapping can be to tuples of Distribution 

210 constructors and dictionary of input arguments. 

211 In this case, the dictionary can map argument names to 

212 numbers, or to strings. The strings are parsed as 

213 mathematical expressions and evaluated in the scope 

214 of a calibration dictionary. 

215 

216 dynamics: Mapping(str, str or callable) 

217 A dictionary mapping variable names to mathematical expressions. 

218 These expressions can be simple functions, in which case the 

219 argument names should match the variable inputs. 

220 Or these can be strings, which are parsed into functions. 

221 

222 """ 

223 

224 name: str = "" 

225 description: str = "" 

226 shocks: dict = field(default_factory=dict) 

227 dynamics: dict = field(default_factory=dict) 

228 reward: dict = field(default_factory=dict) 

229 

230 def construct_shocks(self, calibration): 

231 """ 

232 Constructs all shocks given calibration. 

233 This method mutates the DBlock. 

234 """ 

235 self.shocks = construct_shocks(self.shocks, calibration) 

236 

237 def discretize(self, disc_params): 

238 """ 

239 Returns a new DBlock which is a copy of this one, but with shock discretized. 

240 """ 

241 

242 disc_shocks = {} 

243 

244 for shockn in self.shocks: 

245 if shockn in disc_params: 

246 disc_shocks[shockn] = self.shocks[shockn].discretize( 

247 **disc_params[shockn] 

248 ) 

249 else: 

250 disc_shocks[shockn] = deepcopy(self.shocks[shockn]) 

251 

252 # replace returns a modified copy 

253 new_dblock = replace(self, shocks=disc_shocks) 

254 

255 return new_dblock 

256 

257 def __post_init__(self): 

258 for v in self.dynamics: 

259 if isinstance(self.dynamics[v], str): 

260 self.dynamics[v] = math_text_to_lambda(self.dynamics[v]) 

261 

262 for r in self.reward: 

263 if isinstance(self.reward[r], str): 

264 self.reward[r] = math_text_to_lambda(self.reward[r]) 

265 

266 def get_shocks(self): 

267 return self.shocks 

268 

269 def get_dynamics(self): 

270 return self.dynamics 

271 

272 def get_vars(self): 

273 return ( 

274 list(self.shocks.keys()) 

275 + list(self.dynamics.keys()) 

276 + list(self.reward.keys()) 

277 ) 

278 

279 def transition(self, pre, dr): 

280 """ 

281 Returns variable values given previous values and decision rule for all controls. 

282 """ 

283 return simulate_dynamics(self.dynamics, pre, dr) 

284 

285 def calc_reward(self, vals): 

286 """ 

287 Computes the reward for a given set of variable values 

288 """ 

289 rvals = {} 

290 

291 for varn in self.reward: 

292 feq = self.reward[varn] 

293 rvals[varn] = feq(*[vals[var] for var in signature(feq).parameters]) 

294 

295 return rvals 

296 

297 def get_state_rule_value_function_from_continuation(self, continuation): 

298 """ 

299 Given a continuation value function, returns a state-rule value 

300 function: the value for each state and decision rule. 

301 This value includes both the reward for executing the rule 

302 'this period', and the continuation value of the resulting states. 

303 """ 

304 

305 def state_rule_value_function(pre, dr): 

306 vals = self.transition(pre, dr) 

307 r = list(self.calc_reward(vals).values())[0] # a hack; to be improved 

308 cv = continuation( 

309 *[vals[var] for var in signature(continuation).parameters] 

310 ) 

311 

312 return r + cv 

313 

314 return state_rule_value_function 

315 

316 def get_decision_value_function(self, dr, continuation): 

317 """ 

318 Given a decision rule and a continuation value function, 

319 return a function for the value at the decision step/tac, 

320 after the shock have been realized. 

321 

322 ## TODO: it would be better to systematize these value functions per block 

323 ## better, then construct them with 'partial' methods 

324 """ 

325 srvf = self.get_state_rule_value_function_from_continuation(continuation) 

326 

327 def decision_value_function(shpre): 

328 return srvf(shpre, dr) 

329 

330 return decision_value_function 

331 

332 def get_arrival_value_function(self, disc_params, dr, continuation): 

333 """ 

334 Returns an arrival value function, which is the value of the states 

335 upon arrival into the block. 

336 

337 This involves taking an expectation over shocks (which must 

338 first be discretized), a decision rule, and a continuation 

339 value function.) 

340 """ 

341 

342 def arrival_value_function(arvs): 

343 dvf = self.get_decision_value_function(dr, continuation) 

344 

345 ds = discretized_shock_dstn(self.shocks, disc_params) 

346 

347 arvs_args = [arvs[avn] for avn in arvs] 

348 

349 def mod_dvf(shock_value_array): 

350 shockvs = { 

351 shn: shock_value_array[shn] 

352 for i, shn in enumerate(list(ds.variables.keys())) 

353 } 

354 

355 dvf_args = {} 

356 dvf_args.update(arvs) 

357 dvf_args.update(shockvs) 

358 

359 return dvf(dvf_args) 

360 

361 return expected(func=mod_dvf, dist=ds) 

362 

363 return arrival_value_function 

364 

365 

366@dataclass 

367class RBlock(Block): 

368 """ 

369 A recursive block. 

370 

371 Parameters 

372 ---------- 

373 ... 

374 """ 

375 

376 name: str = "" 

377 description: str = "" 

378 blocks: List[Block] = field(default_factory=list) 

379 

380 def construct_shocks(self, calibration): 

381 """ 

382 Construct all shocks given a calibration dictionary. 

383 """ 

384 for b in self.blocks: 

385 b.construct_shocks(calibration) 

386 

387 def discretize(self, disc_params): 

388 """ 

389 Recursively discretizes all the blocks. 

390 It replaces any DBlocks with new blocks with discretized shocks. 

391 """ 

392 cbs = copy(self.blocks) 

393 

394 for i, b in list(enumerate(cbs)): 

395 if isinstance(b, DBlock): 

396 nb = b.discretize(disc_params) 

397 cbs[i] = nb 

398 elif isinstance(b, RBlock): 

399 b.discretize(disc_params) 

400 

401 # returns a copy of the RBlock with the blocks replaced 

402 return replace(self, blocks=cbs) 

403 

404 def get_shocks(self): 

405 ### TODO: Bug in here is causing AttributeError: 'set' object has no attribute 'draw' 

406 

407 super_shocks = {} # uses set to avoid duplicates 

408 

409 for b in self.blocks: 

410 for k, v in b.get_shocks().items(): # use d.iteritems() in python 2 

411 super_shocks[k] = v 

412 

413 return super_shocks 

414 

415 def get_controls(self): 

416 dyn = self.get_dynamics() 

417 

418 return [varn for varn in dyn if isinstance(dyn[varn], Control)] 

419 

420 def get_dynamics(self): 

421 super_dyn = {} # uses set to avoid duplicates 

422 

423 for b in self.blocks: 

424 for k, v in b.get_dynamics().items(): # use d.iteritems() in python 2 

425 super_dyn[k] = v 

426 

427 return super_dyn 

428 

429 def get_vars(self): 

430 return list(self.get_shocks().keys()) + list(self.get_dynamics().keys())