Coverage for HARK / simulation / monte_carlo.py: 97%

197 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-08 05:31 +0000

1""" 

2Functions to support Monte Carlo simulation of models. 

3""" 

4 

5from copy import copy 

6from typing import Mapping, Sequence 

7 

8import numpy as np 

9 

10from HARK.distributions import ( 

11 Distribution, 

12 IndexDistribution, 

13) 

14from HARK.model import Aggregate 

15from HARK.model import DBlock 

16from HARK.model import construct_shocks, simulate_dynamics 

17 

18from HARK.utilities import apply_fun_to_vals 

19 

20 

21def draw_shocks(shocks: Mapping[str, Distribution], conditions: Sequence[int]): 

22 """ 

23 Draw from each shock distribution values, subject to given conditions. 

24 

25 Parameters 

26 ---------- 

27 shocks : Mapping[str, Distribution] 

28 A dictionary-like mapping from shock names to distributions from which to draw. 

29 conditions : Sequence[int] 

30 An array of conditions, one for each agent. 

31 Typically these will be agent ages. 

32 

33 Returns 

34 ------- 

35 draws : Mapping[str, Sequence] 

36 A mapping from shock names to drawn shock values. 

37 """ 

38 draws = {} 

39 

40 for shock_var in shocks: 

41 shock = shocks[shock_var] 

42 

43 if isinstance(shock, (int, float)): 

44 draws[shock_var] = np.ones(len(conditions)) * shock 

45 elif isinstance(shock, Aggregate): 

46 draws[shock_var] = shock.dist.draw(1)[0] 

47 elif isinstance(shock, IndexDistribution): 

48 draws[shock_var] = shock.draw(conditions) 

49 else: 

50 draws[shock_var] = shock.draw(len(conditions)) 

51 # this is hacky if there are no conditions. 

52 

53 return draws 

54 

55 

56def calibration_by_age(ages, calibration): 

57 """ 

58 Returns calibration for this model, but with vectorized 

59 values which map age-varying values to agent ages. 

60 

61 Parameters 

62 ---------- 

63 ages: np.array 

64 An array of agent ages. 

65 

66 calibration: dict 

67 A calibration dictionary 

68 

69 Returns 

70 -------- 

71 aged_calibration: dict 

72 A dictionary of parameter values. 

73 If a parameter is age-varying, the value is a vector 

74 corresponding to the values for each input age. 

75 """ 

76 

77 def aged_param(ages, p_value): 

78 if isinstance(p_value, (float, int)) or callable(p_value): 

79 return p_value 

80 elif isinstance(p_value, list) and len(p_value) > 1: 

81 pv_array = np.array(p_value) 

82 

83 return np.apply_along_axis(lambda a: pv_array[a], 0, ages) 

84 else: 

85 return np.empty(ages.size) 

86 

87 return {p: aged_param(ages, calibration[p]) for p in calibration} 

88 

89 

90class Simulator: 

91 """ 

92 Base class for Monte Carlo simulators. 

93 

94 Subclasses must set the following instance attributes before calling any 

95 of the methods defined here: 

96 

97 self.vars -- list of variable names tracked in the simulation 

98 self.T_sim -- int, number of periods to simulate 

99 self.agent_count -- int, number of agents 

100 self.seed -- int, random seed 

101 self.history -- dict, populated by clear_history / simulate 

102 self.vars_now -- dict, current-period variable values per agent 

103 self.newborn_init_history -- dict, initial values for newborn agents 

104 self.t_sim -- int, current simulation time step 

105 

106 Subclasses must also implement ``sim_one_period``. 

107 """ 

108 

109 def reset_rng(self): 

110 """ 

111 Reset the random number generator for this type. 

112 """ 

113 self.RNG = np.random.default_rng(self.seed) 

114 

115 def clear_history(self): 

116 """ 

117 Clears the histories. 

118 """ 

119 for var_name in self.vars: 

120 self.history[var_name] = np.empty((self.T_sim, self.agent_count)) 

121 self.history[var_name].fill(np.nan) 

122 

123 def _assign_initial_vals(self, which_agents, initial_vals): 

124 """ 

125 Assign drawn initial values to the agents flagged by ``which_agents`` 

126 and record those values in ``newborn_init_history``. 

127 

128 This helper captures the common assignment pattern shared by every 

129 ``sim_birth`` implementation: loop over the variables in 

130 ``initial_vals``, write them into ``self.vars_now``, and store them 

131 in ``self.newborn_init_history`` at the current simulation step. 

132 

133 Parameters 

134 ---------- 

135 which_agents : np.array(bool) 

136 Boolean mask of length ``self.agent_count`` marking the agents 

137 that are being born this period. 

138 

139 initial_vals : dict 

140 Mapping from variable name to an array of drawn initial values, 

141 one entry per newly-born agent (i.e. length equal to 

142 ``which_agents.sum()``). 

143 

144 Returns 

145 ------- 

146 None 

147 """ 

148 if np.sum(which_agents) > 0: 

149 for varn in initial_vals: 

150 self.vars_now[varn][which_agents] = initial_vals[varn] 

151 self.newborn_init_history[varn][self.t_sim, which_agents] = ( 

152 initial_vals[varn] 

153 ) 

154 

155 def simulate(self, sim_periods=None): 

156 """ 

157 Simulate for a given number of periods, defaulting to ``self.T_sim``. 

158 

159 Records histories of attributes named in ``self.vars`` in 

160 ``self.history[var_name]``. 

161 

162 Parameters 

163 ---------- 

164 sim_periods : int, optional 

165 Number of periods to simulate. If ``None``, simulate for 

166 ``self.T_sim`` periods. 

167 

168 Returns 

169 ------- 

170 history : dict 

171 The history tracked during the simulation. 

172 """ 

173 if not hasattr(self, "t_sim"): 

174 raise RuntimeError( 

175 "Simulation variables were not initialized. " 

176 "Call initialize_sim() before simulate()." 

177 ) 

178 if sim_periods is not None and self.T_sim < sim_periods: 

179 raise ValueError( 

180 "sim_periods (" 

181 + str(sim_periods) 

182 + ") exceeds T_sim (" 

183 + str(self.T_sim) 

184 + "). Either increase T_sim and call " 

185 "initialize_sim() again, or set sim_periods <= T_sim." 

186 ) 

187 

188 # Ignore floating point "errors". Numpy calls it "errors", but really it's excep- 

189 # tions with well-defined answers such as 1.0/0.0 that is np.inf, -1.0/0.0 that is 

190 # -np.inf, np.inf/np.inf is np.nan and so on. 

191 with np.errstate( 

192 divide="ignore", over="ignore", under="ignore", invalid="ignore" 

193 ): 

194 if sim_periods is None: 

195 sim_periods = self.T_sim 

196 

197 for t in range(sim_periods): 

198 self.sim_one_period() 

199 

200 # track all the vars -- shocks and dynamics 

201 for var_name in self.vars: 

202 self.history[var_name][self.t_sim, :] = self.vars_now[var_name] 

203 

204 self.t_sim += 1 

205 

206 return self.history 

207 

208 

209class AgentTypeMonteCarloSimulator(Simulator): 

210 """ 

211 A Monte Carlo simulation engine based on the HARK.core.AgentType framework. 

212 

213 Unlike HARK.core.AgentType, this class does not do any model solving, 

214 and depends on dynamic equations, shocks, and decision rules passed into it. 

215 

216 The purpose of this class is to provide a way to simulate models without 

217 relying on inheritance from the AgentType class. 

218 

219 This simulator makes assumptions about population birth and mortality which 

220 are not generic. All agents are replaced with newborns when they expire. 

221 

222 Parameters 

223 ------------ 

224 

225 calibration: Mapping[str, Any] 

226 

227 block : DBlock 

228 Has shocks, dynamics, and rewards 

229 

230 dr: Mapping[str, Callable] 

231 

232 initial: dict 

233 

234 seed : int 

235 A seed for this instance's random number generator. 

236 

237 Attributes 

238 ---------- 

239 agent_count : int 

240 The number of agents of this type to use in simulation. 

241 

242 T_sim : int 

243 The number of periods to simulate. 

244 """ 

245 

246 state_vars = [] 

247 

248 def __init__( 

249 self, calibration, block: DBlock, dr, initial, seed=0, agent_count=1, T_sim=10 

250 ): 

251 super().__init__() 

252 

253 self.calibration = calibration 

254 self.block = block 

255 

256 # shocks are exogenous (but for age) but can depend on calibration 

257 raw_shocks = block.get_shocks() 

258 self.shocks = construct_shocks(raw_shocks, calibration) 

259 

260 self.dynamics = block.get_dynamics() 

261 self.dr = dr 

262 self.initial = initial 

263 

264 self.seed = seed # NOQA 

265 self.agent_count = agent_count 

266 self.T_sim = T_sim 

267 

268 # changes here from HARK.core.AgentType 

269 self.vars = block.get_vars() 

270 

271 self.vars_now = {v: None for v in self.vars} 

272 self.vars_prev = self.vars_now.copy() 

273 

274 self.read_shocks = False # NOQA 

275 self.shock_history = {} 

276 self.newborn_init_history = {} 

277 self.history = {} 

278 

279 self.reset_rng() # NOQA 

280 

281 def initialize_sim(self): 

282 """ 

283 Prepares for a new simulation. Resets the internal random number generator, 

284 makes initial states for all agents (using sim_birth), clears histories of tracked variables. 

285 """ 

286 if self.T_sim <= 0: 

287 raise Exception( 

288 "T_sim represents the largest number of observations " 

289 + "that can be simulated for an agent, and must be a positive number." 

290 ) 

291 

292 self.reset_rng() 

293 self.t_sim = 0 

294 all_agents = np.ones(self.agent_count, dtype=bool) 

295 blank_array = np.empty(self.agent_count) 

296 blank_array[:] = np.nan 

297 for var in self.vars: 

298 if self.vars_now[var] is None: 

299 self.vars_now[var] = copy(blank_array) 

300 

301 self.t_age = np.zeros( 

302 self.agent_count, dtype=int 

303 ) # Number of periods since agent entry 

304 self.t_cycle = np.zeros( 

305 self.agent_count, dtype=int 

306 ) # Which cycle period each agent is on 

307 

308 # Get recorded newborn conditions or initialize blank history. 

309 if self.read_shocks and bool(self.newborn_init_history): 

310 for init_var_name in self.initial: 

311 self.vars_now[init_var_name] = self.newborn_init_history[init_var_name][ 

312 self.t_sim, : 

313 ] 

314 else: 

315 for var_name in self.initial: 

316 self.newborn_init_history[var_name] = ( 

317 np.zeros((self.T_sim, self.agent_count)) + np.nan 

318 ) 

319 

320 self.sim_birth(all_agents) 

321 

322 self.clear_history() 

323 return None 

324 

325 def sim_one_period(self): 

326 """ 

327 Simulates one period for this type. Calls the methods get_mortality(), get_shocks() or 

328 read_shocks, get_states(), get_controls(), and get_poststates(). These should be defined for 

329 AgentType subclasses, except get_mortality (define its components sim_death and sim_birth 

330 instead) and read_shocks. 

331 """ 

332 # Mortality adjusts the agent population 

333 self.get_mortality() # Replace some agents with "newborns" 

334 

335 # state_{t-1} 

336 for var in self.vars: 

337 self.vars_prev[var] = self.vars_now[var] 

338 

339 if isinstance(self.vars_now[var], np.ndarray): 

340 self.vars_now[var] = np.empty(self.agent_count) 

341 self.vars_now[var][:] = np.nan 

342 else: 

343 # Probably an aggregate variable. It may be getting set by the Market. 

344 pass 

345 

346 shocks_now = {} 

347 

348 if self.read_shocks: # If shock histories have been pre-specified, use those 

349 for var_name in self.shocks: 

350 shocks_now[var_name] = self.shock_history[var_name][self.t_sim, :] 

351 else: 

352 shocks_now = draw_shocks(self.shocks, self.t_age) 

353 

354 pre = calibration_by_age(self.t_age, self.calibration) 

355 

356 pre.update(self.vars_prev) 

357 pre.update(shocks_now) 

358 # Won't work for 3.8: self.parameters | self.vars_prev | shocks_now 

359 

360 # Age-varying decision rules captured here 

361 dr = calibration_by_age(self.t_age, self.dr) 

362 

363 post = simulate_dynamics(self.dynamics, pre, dr) 

364 

365 self.vars_now = post 

366 

367 # Advance time for all agents 

368 self.t_age = self.t_age + 1 # Age all consumers by one period 

369 

370 # What will we do with cycles? 

371 # self.t_cycle = self.t_cycle + 1 # Age all consumers within their cycle 

372 # self.t_cycle[ 

373 # self.t_cycle == self.T_cycle 

374 # ] = 0 # Resetting to zero for those who have reached the end 

375 

376 def make_shock_history(self): 

377 """ 

378 Makes a pre-specified history of shocks for the simulation. Shock variables should be named 

379 in self.shock, a mapping from shock names to distributions. This method runs a subset 

380 of the standard simulation loop by simulating only mortality and shocks; each variable named 

381 in shocks is stored in a T_sim x agent_count array in history dictionary self.history[X]. 

382 Automatically sets self.read_shocks to True so that these pre-specified shocks are used for 

383 all subsequent calls to simulate(). 

384 

385 Returns 

386 ------- 

387 shock_history: dict 

388 The subset of simulation history that are the shocks for each agent and time. 

389 """ 

390 # Re-initialize the simulation 

391 self.initialize_sim() 

392 self.simulate() 

393 

394 for shock_name in self.shocks: 

395 self.shock_history[shock_name] = self.history[shock_name] 

396 

397 # Flag that shocks can be read rather than simulated 

398 self.read_shocks = True 

399 self.clear_history() 

400 

401 return self.shock_history 

402 

403 def get_mortality(self): 

404 """ 

405 Simulates mortality or agent turnover. 

406 Agents die when their states `live` is less than or equal to zero. 

407 """ 

408 who_dies = self.vars_now["live"] <= 0 

409 

410 self.sim_birth(who_dies) 

411 

412 self.who_dies = who_dies 

413 return None 

414 

415 def sim_birth(self, which_agents): 

416 """ 

417 Makes new agents for the simulation. Takes a boolean array as an input, indicating which 

418 agent indices are to be "born". Does nothing by default, must be overwritten by a subclass. 

419 

420 Parameters 

421 ---------- 

422 which_agents : np.array(Bool) 

423 Boolean array of size self.agent_count indicating which agents should be "born". 

424 

425 Returns 

426 ------- 

427 None 

428 """ 

429 if self.read_shocks: 

430 t = self.t_sim 

431 initial_vals = { 

432 init_var: self.newborn_init_history[init_var][t, which_agents] 

433 for init_var in self.initial 

434 } 

435 

436 else: 

437 initial_vals = draw_shocks(self.initial, np.zeros(which_agents.sum())) 

438 

439 self._assign_initial_vals(which_agents, initial_vals) 

440 

441 self.t_age[which_agents] = 0 

442 self.t_cycle[which_agents] = 0 

443 

444 

445class MonteCarloSimulator(Simulator): 

446 """ 

447 A Monte Carlo simulation engine based. 

448 

449 Unlike the AgentTypeMonteCarloSimulator HARK.core.AgentType, 

450 this class does make any assumptions about aging or mortality. 

451 It operates only on model information passed in as blocks. 

452 

453 It also does not have read_shocks functionality; 

454 it is a strict subset of the AgentTypeMonteCarloSimulator functionality. 

455 

456 Parameters 

457 ------------ 

458 

459 calibration: Mapping[str, Any] 

460 

461 block : DBlock 

462 Has shocks, dynamics, and rewards 

463 

464 dr: Mapping[str, Callable] 

465 

466 initial: dict 

467 

468 seed : int 

469 A seed for this instance's random number generator. 

470 

471 Attributes 

472 ---------- 

473 agent_count : int 

474 The number of agents of this type to use in simulation. 

475 

476 T_sim : int 

477 The number of periods to simulate. 

478 """ 

479 

480 state_vars = [] 

481 

482 def __init__( 

483 self, calibration, block: DBlock, dr, initial, seed=0, agent_count=1, T_sim=10 

484 ): 

485 super().__init__() 

486 

487 self.calibration = calibration 

488 self.block = block 

489 

490 # shocks are exogenous (but for age) but can depend on calibration 

491 raw_shocks = block.get_shocks() 

492 self.shocks = construct_shocks(raw_shocks, calibration) 

493 

494 self.dynamics = block.get_dynamics() 

495 self.dr = dr 

496 self.initial = initial 

497 

498 self.seed = seed # NOQA 

499 self.agent_count = agent_count # TODO: pass this in at block level 

500 self.T_sim = T_sim 

501 

502 # changes here from HARK.core.AgentType 

503 self.vars = block.get_vars() 

504 

505 self.vars_now = {v: None for v in self.vars} 

506 self.vars_prev = self.vars_now.copy() 

507 

508 self.shock_history = {} 

509 self.newborn_init_history = {} 

510 self.history = {} 

511 

512 self.reset_rng() # NOQA 

513 

514 def initialize_sim(self): 

515 """ 

516 Prepares for a new simulation. Resets the internal random number generator, 

517 makes initial states for all agents (using sim_birth), clears histories of tracked variables. 

518 """ 

519 if self.T_sim <= 0: 

520 raise Exception( 

521 "T_sim represents the largest number of observations " 

522 + "that can be simulated for an agent, and must be a positive number." 

523 ) 

524 

525 self.reset_rng() 

526 self.t_sim = 0 

527 all_agents = np.ones(self.agent_count, dtype=bool) 

528 blank_array = np.empty(self.agent_count) 

529 blank_array[:] = np.nan 

530 for var in self.vars: 

531 if self.vars_now[var] is None: 

532 self.vars_now[var] = copy(blank_array) 

533 

534 self.t_cycle = np.zeros( 

535 self.agent_count, dtype=int 

536 ) # Which cycle period each agent is on 

537 

538 for var_name in self.initial: 

539 self.newborn_init_history[var_name] = ( 

540 np.zeros((self.T_sim, self.agent_count)) + np.nan 

541 ) 

542 

543 self.sim_birth(all_agents) 

544 

545 self.clear_history() 

546 return None 

547 

548 def sim_one_period(self): 

549 """ 

550 Simulates one period for this type. Calls the methods get_mortality(), get_shocks() or 

551 read_shocks, get_states(), get_controls(), and get_poststates(). These should be defined for 

552 AgentType subclasses, except get_mortality (define its components sim_death and sim_birth 

553 instead) and read_shocks. 

554 """ 

555 

556 # state_{t-1} 

557 for var in self.vars: 

558 self.vars_prev[var] = self.vars_now[var] 

559 

560 if isinstance(self.vars_now[var], np.ndarray): 

561 self.vars_now[var] = np.empty(self.agent_count) 

562 self.vars_now[var][:] = np.nan 

563 else: 

564 # Probably an aggregate variable. It may be getting set by the Market. 

565 pass 

566 

567 shocks_now = {} 

568 

569 shocks_now = draw_shocks( 

570 self.shocks, 

571 np.zeros(self.agent_count), # TODO: stupid hack to remove age calculations. 

572 # this needs a little more thought 

573 ) 

574 

575 pre = self.calibration.copy() # for AgentTypeMC, this is conditional on age 

576 # TODO: generalize indexing into calibration. 

577 

578 pre.update(self.vars_prev) 

579 pre.update(shocks_now) 

580 

581 # Won't work for 3.8: self.parameters | self.vars_prev | shocks_now 

582 

583 dr = self.dr # AgentTypeMC chooses rule by age; 

584 # that generalizes to age as a DR argument? 

585 

586 post = simulate_dynamics(self.dynamics, pre, dr) 

587 

588 for r in self.block.reward: 

589 post[r] = apply_fun_to_vals(self.block.reward[r], post) 

590 

591 self.vars_now = post 

592 

593 def sim_birth(self, which_agents): 

594 """ 

595 Makes new agents for the simulation. Takes a boolean array as an input, indicating which 

596 agent indices are to be "born". Does nothing by default, must be overwritten by a subclass. 

597 

598 Parameters 

599 ---------- 

600 which_agents : np.array(Bool) 

601 Boolean array of size self.agent_count indicating which agents should be "born". 

602 

603 Returns 

604 ------- 

605 None 

606 """ 

607 

608 initial_vals = draw_shocks(self.initial, np.zeros(which_agents.sum())) 

609 

610 self._assign_initial_vals(which_agents, initial_vals)