Coverage for HARK / simulation / monte_carlo.py: 98%

178 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-10 06:19 +0000

1""" 

2Functions to support Monte Carlo simulation of models. 

3""" 

4 

5from copy import copy 

6from typing import Mapping, Sequence 

7 

8import numpy as np 

9 

10from HARK.distributions import ( 

11 Distribution, 

12 IndexDistribution, 

13) 

14from HARK.model import Aggregate 

15from HARK.model import DBlock 

16from HARK.model import construct_shocks, simulate_dynamics 

17 

18from HARK.utilities import apply_fun_to_vals 

19 

20 

21def draw_shocks(shocks: Mapping[str, Distribution], conditions: Sequence[int]): 

22 """ 

23 Draw from each shock distribution values, subject to given conditions. 

24 

25 Parameters 

26 ---------- 

27 shocks : Mapping[str, Distribution] 

28 A dictionary-like mapping from shock names to distributions from which to draw. 

29 conditions : Sequence[int] 

30 An array of conditions, one for each agent. 

31 Typically these will be agent ages. 

32 

33 Returns 

34 ------- 

35 draws : Mapping[str, Sequence] 

36 A mapping from shock names to drawn shock values. 

37 """ 

38 draws = {} 

39 

40 for shock_var in shocks: 

41 shock = shocks[shock_var] 

42 

43 if isinstance(shock, (int, float)): 

44 draws[shock_var] = np.ones(len(conditions)) * shock 

45 elif isinstance(shock, Aggregate): 

46 draws[shock_var] = shock.dist.draw(1)[0] 

47 elif isinstance(shock, IndexDistribution): 

48 draws[shock_var] = shock.draw(conditions) 

49 else: 

50 draws[shock_var] = shock.draw(len(conditions)) 

51 # this is hacky if there are no conditions. 

52 

53 return draws 

54 

55 

56def calibration_by_age(ages, calibration): 

57 """ 

58 Returns calibration for this model, but with vectorized 

59 values which map age-varying values to agent ages. 

60 

61 Parameters 

62 ---------- 

63 ages: np.array 

64 An array of agent ages. 

65 

66 calibration: dict 

67 A calibration dictionary 

68 

69 Returns 

70 -------- 

71 aged_calibration: dict 

72 A dictionary of parameter values. 

73 If a parameter is age-varying, the value is a vector 

74 corresponding to the values for each input age. 

75 """ 

76 

77 def aged_param(ages, p_value): 

78 if isinstance(p_value, (float, int)) or callable(p_value): 

79 return p_value 

80 elif isinstance(p_value, list) and len(p_value) > 1: 

81 pv_array = np.array(p_value) 

82 

83 return np.apply_along_axis(lambda a: pv_array[a], 0, ages) 

84 else: 

85 return np.empty(ages.size) 

86 

87 return {p: aged_param(ages, calibration[p]) for p in calibration} 

88 

89 

90class Simulator: 

91 """ 

92 Base class for Monte Carlo simulators. 

93 

94 Subclasses must set the following instance attributes before calling any 

95 of the methods defined here: 

96 

97 self.vars -- list of variable names tracked in the simulation 

98 self.T_sim -- int, number of periods to simulate 

99 self.agent_count -- int, number of agents 

100 self.seed -- int, random seed 

101 self.history -- dict, populated by clear_history / simulate 

102 self.vars_now -- dict, current-period variable values per agent 

103 self.newborn_init_history -- dict, initial values for newborn agents 

104 self.t_sim -- int, current simulation time step 

105 

106 Subclasses must also implement ``sim_one_period``. 

107 """ 

108 

109 def reset_rng(self): 

110 """ 

111 Reset the random number generator for this type. 

112 """ 

113 self.RNG = np.random.default_rng(self.seed) 

114 

115 def clear_history(self): 

116 """ 

117 Clears the histories. 

118 """ 

119 for var_name in self.vars: 

120 self.history[var_name] = np.empty((self.T_sim, self.agent_count)) 

121 self.history[var_name].fill(np.nan) 

122 

123 def _setup_block_state( 

124 self, calibration, block, dr, initial, seed, agent_count, T_sim 

125 ): 

126 """ 

127 Common attribute setup shared by every block-driven Simulator subclass. 

128 

129 Stores the calibration/block/dr/initial inputs, builds the shock 

130 distribution from the block, sizes ``vars_now``/``vars_prev`` from 

131 ``block.get_vars()``, and seeds the simulator's RNG. History dicts 

132 are also initialized. Subclasses set their own subclass-specific 

133 attributes (e.g. ``read_shocks``) before or after this call. 

134 """ 

135 self.calibration = calibration 

136 self.block = block 

137 

138 # Shocks are exogenous (but for age) but can depend on calibration. 

139 raw_shocks = block.get_shocks() 

140 self.shocks = construct_shocks(raw_shocks, calibration) 

141 

142 self.dynamics = block.get_dynamics() 

143 self.dr = dr 

144 self.initial = initial 

145 

146 self.seed = seed # NOQA 

147 self.agent_count = agent_count 

148 self.T_sim = T_sim 

149 

150 self.vars = block.get_vars() 

151 self.vars_now = {v: None for v in self.vars} 

152 self.vars_prev = self.vars_now.copy() 

153 

154 self._init_history_dicts() 

155 self.reset_rng() # NOQA 

156 

157 def _init_newborn_init_history(self): 

158 """Allocate ``newborn_init_history`` arrays for each variable in ``self.initial``.""" 

159 for var_name in self.initial: 

160 self.newborn_init_history[var_name] = ( 

161 np.zeros((self.T_sim, self.agent_count)) + np.nan 

162 ) 

163 

164 def _init_history_dicts(self): 

165 """Allocate empty history dicts shared by every Simulator subclass.""" 

166 self.shock_history = {} 

167 self.newborn_init_history = {} 

168 self.history = {} 

169 

170 def _init_blank_now_vars(self): 

171 """Fill any uninitialized ``vars_now`` entries with NaN arrays.""" 

172 blank_array = np.empty(self.agent_count) 

173 blank_array[:] = np.nan 

174 for var in self.vars: 

175 if self.vars_now[var] is None: 

176 self.vars_now[var] = copy(blank_array) 

177 

178 def _rotate_state_buffers(self): 

179 """Move ``vars_now`` to ``vars_prev`` and reinitialize array buffers to NaN.""" 

180 for var in self.vars: 

181 self.vars_prev[var] = self.vars_now[var] 

182 if isinstance(self.vars_now[var], np.ndarray): 

183 self.vars_now[var] = np.empty(self.agent_count) 

184 self.vars_now[var][:] = np.nan 

185 # else: aggregate variable possibly set by a Market - leave alone 

186 

187 def _start_sim(self): 

188 """Common ``initialize_sim`` prefix: validate, reset RNG, blank buffers, 

189 initialize ``t_cycle``, and return the all-agents mask used for sim_birth.""" 

190 self._validate_T_sim() 

191 self.reset_rng() 

192 self.t_sim = 0 

193 self._init_blank_now_vars() 

194 self.t_cycle = np.zeros(self.agent_count, dtype=int) 

195 return np.ones(self.agent_count, dtype=bool) 

196 

197 def _finish_sim_init(self, all_agents): 

198 """Common ``initialize_sim`` suffix: birth all agents and clear history.""" 

199 self.sim_birth(all_agents) 

200 self.clear_history() 

201 

202 def _validate_T_sim(self): 

203 """Raise if ``T_sim`` was not set to a positive integer.""" 

204 if self.T_sim <= 0: 

205 raise Exception( 

206 "T_sim represents the largest number of observations " 

207 + "that can be simulated for an agent, and must be a positive number." 

208 ) 

209 

210 def _assign_initial_vals(self, which_agents, initial_vals): 

211 """ 

212 Assign drawn initial values to the agents flagged by ``which_agents`` 

213 and record those values in ``newborn_init_history``. 

214 

215 This helper captures the common assignment pattern shared by every 

216 ``sim_birth`` implementation: loop over the variables in 

217 ``initial_vals``, write them into ``self.vars_now``, and store them 

218 in ``self.newborn_init_history`` at the current simulation step. 

219 

220 Parameters 

221 ---------- 

222 which_agents : np.array(bool) 

223 Boolean mask of length ``self.agent_count`` marking the agents 

224 that are being born this period. 

225 

226 initial_vals : dict 

227 Mapping from variable name to an array of drawn initial values, 

228 one entry per newly-born agent (i.e. length equal to 

229 ``which_agents.sum()``). 

230 

231 Returns 

232 ------- 

233 None 

234 """ 

235 if np.sum(which_agents) > 0: 

236 for varn in initial_vals: 

237 self.vars_now[varn][which_agents] = initial_vals[varn] 

238 self.newborn_init_history[varn][self.t_sim, which_agents] = ( 

239 initial_vals[varn] 

240 ) 

241 

242 def simulate(self, sim_periods=None): 

243 """ 

244 Simulate for a given number of periods, defaulting to ``self.T_sim``. 

245 

246 Records histories of attributes named in ``self.vars`` in 

247 ``self.history[var_name]``. 

248 

249 Parameters 

250 ---------- 

251 sim_periods : int, optional 

252 Number of periods to simulate. If ``None``, simulate for 

253 ``self.T_sim`` periods. 

254 

255 Returns 

256 ------- 

257 history : dict 

258 The history tracked during the simulation. 

259 """ 

260 if not hasattr(self, "t_sim"): 

261 raise RuntimeError( 

262 "Simulation variables were not initialized. " 

263 "Call initialize_sim() before simulate()." 

264 ) 

265 if sim_periods is not None and self.T_sim < sim_periods: 

266 raise ValueError( 

267 "sim_periods (" 

268 + str(sim_periods) 

269 + ") exceeds T_sim (" 

270 + str(self.T_sim) 

271 + "). Either increase T_sim and call " 

272 "initialize_sim() again, or set sim_periods <= T_sim." 

273 ) 

274 

275 # Ignore floating point "errors". Numpy calls it "errors", but really it's excep- 

276 # tions with well-defined answers such as 1.0/0.0 that is np.inf, -1.0/0.0 that is 

277 # -np.inf, np.inf/np.inf is np.nan and so on. 

278 with np.errstate( 

279 divide="ignore", over="ignore", under="ignore", invalid="ignore" 

280 ): 

281 if sim_periods is None: 

282 sim_periods = self.T_sim 

283 

284 for t in range(sim_periods): 

285 self.sim_one_period() 

286 

287 # track all the vars -- shocks and dynamics 

288 for var_name in self.vars: 

289 self.history[var_name][self.t_sim, :] = self.vars_now[var_name] 

290 

291 self.t_sim += 1 

292 

293 return self.history 

294 

295 

296class AgentTypeMonteCarloSimulator(Simulator): 

297 """ 

298 A Monte Carlo simulation engine based on the HARK.core.AgentType framework. 

299 

300 Unlike HARK.core.AgentType, this class does not do any model solving, 

301 and depends on dynamic equations, shocks, and decision rules passed into it. 

302 

303 The purpose of this class is to provide a way to simulate models without 

304 relying on inheritance from the AgentType class. 

305 

306 This simulator makes assumptions about population birth and mortality which 

307 are not generic. All agents are replaced with newborns when they expire. 

308 

309 Parameters 

310 ------------ 

311 

312 calibration: Mapping[str, Any] 

313 

314 block : DBlock 

315 Has shocks, dynamics, and rewards 

316 

317 dr: Mapping[str, Callable] 

318 

319 initial: dict 

320 

321 seed : int 

322 A seed for this instance's random number generator. 

323 

324 Attributes 

325 ---------- 

326 agent_count : int 

327 The number of agents of this type to use in simulation. 

328 

329 T_sim : int 

330 The number of periods to simulate. 

331 """ 

332 

333 state_vars = [] 

334 

335 def __init__( 

336 self, calibration, block: DBlock, dr, initial, seed=0, agent_count=1, T_sim=10 

337 ): 

338 super().__init__() 

339 self.read_shocks = False # NOQA -- must precede _setup if used by helpers 

340 self._setup_block_state( 

341 calibration, block, dr, initial, seed, agent_count, T_sim 

342 ) 

343 

344 def initialize_sim(self): 

345 """ 

346 Prepares for a new simulation. Resets the internal random number generator, 

347 makes initial states for all agents (using sim_birth), clears histories of tracked variables. 

348 """ 

349 all_agents = self._start_sim() 

350 self.t_age = np.zeros(self.agent_count, dtype=int) 

351 

352 # Get recorded newborn conditions or initialize blank history. 

353 if self.read_shocks and bool(self.newborn_init_history): 

354 for init_var_name in self.initial: 

355 self.vars_now[init_var_name] = self.newborn_init_history[init_var_name][ 

356 self.t_sim, : 

357 ] 

358 else: 

359 self._init_newborn_init_history() 

360 

361 self._finish_sim_init(all_agents) 

362 return None 

363 

364 def sim_one_period(self): 

365 """ 

366 Simulates one period for this type. Calls the methods get_mortality(), get_shocks() or 

367 read_shocks, get_states(), get_controls(), and get_poststates(). These should be defined for 

368 AgentType subclasses, except get_mortality (define its components sim_death and sim_birth 

369 instead) and read_shocks. 

370 """ 

371 # Mortality adjusts the agent population 

372 self.get_mortality() # Replace some agents with "newborns" 

373 

374 self._rotate_state_buffers() 

375 

376 shocks_now = {} 

377 

378 if self.read_shocks: # If shock histories have been pre-specified, use those 

379 for var_name in self.shocks: 

380 shocks_now[var_name] = self.shock_history[var_name][self.t_sim, :] 

381 else: 

382 shocks_now = draw_shocks(self.shocks, self.t_age) 

383 

384 pre = calibration_by_age(self.t_age, self.calibration) 

385 

386 pre.update(self.vars_prev) 

387 pre.update(shocks_now) 

388 # Won't work for 3.8: self.parameters | self.vars_prev | shocks_now 

389 

390 # Age-varying decision rules captured here 

391 dr = calibration_by_age(self.t_age, self.dr) 

392 

393 post = simulate_dynamics(self.dynamics, pre, dr) 

394 

395 self.vars_now = post 

396 

397 # Advance time for all agents 

398 self.t_age = self.t_age + 1 # Age all consumers by one period 

399 

400 # What will we do with cycles? 

401 # self.t_cycle = self.t_cycle + 1 # Age all consumers within their cycle 

402 # self.t_cycle[ 

403 # self.t_cycle == self.T_cycle 

404 # ] = 0 # Resetting to zero for those who have reached the end 

405 

406 def make_shock_history(self): 

407 """ 

408 Makes a pre-specified history of shocks for the simulation. Shock variables should be named 

409 in self.shock, a mapping from shock names to distributions. This method runs a subset 

410 of the standard simulation loop by simulating only mortality and shocks; each variable named 

411 in shocks is stored in a T_sim x agent_count array in history dictionary self.history[X]. 

412 Automatically sets self.read_shocks to True so that these pre-specified shocks are used for 

413 all subsequent calls to simulate(). 

414 

415 Returns 

416 ------- 

417 shock_history: dict 

418 The subset of simulation history that are the shocks for each agent and time. 

419 """ 

420 # Re-initialize the simulation 

421 self.initialize_sim() 

422 self.simulate() 

423 

424 for shock_name in self.shocks: 

425 self.shock_history[shock_name] = self.history[shock_name] 

426 

427 # Flag that shocks can be read rather than simulated 

428 self.read_shocks = True 

429 self.clear_history() 

430 

431 return self.shock_history 

432 

433 def get_mortality(self): 

434 """ 

435 Simulates mortality or agent turnover. 

436 Agents die when their states `live` is less than or equal to zero. 

437 """ 

438 who_dies = self.vars_now["live"] <= 0 

439 

440 self.sim_birth(who_dies) 

441 

442 self.who_dies = who_dies 

443 return None 

444 

445 def sim_birth(self, which_agents): 

446 """ 

447 Makes new agents for the simulation. Takes a boolean array as an input, indicating which 

448 agent indices are to be "born". Does nothing by default, must be overwritten by a subclass. 

449 

450 Parameters 

451 ---------- 

452 which_agents : np.array(Bool) 

453 Boolean array of size self.agent_count indicating which agents should be "born". 

454 

455 Returns 

456 ------- 

457 None 

458 """ 

459 if self.read_shocks: 

460 t = self.t_sim 

461 initial_vals = { 

462 init_var: self.newborn_init_history[init_var][t, which_agents] 

463 for init_var in self.initial 

464 } 

465 

466 else: 

467 initial_vals = draw_shocks(self.initial, np.zeros(which_agents.sum())) 

468 

469 self._assign_initial_vals(which_agents, initial_vals) 

470 

471 self.t_age[which_agents] = 0 

472 self.t_cycle[which_agents] = 0 

473 

474 

475class MonteCarloSimulator(Simulator): 

476 """ 

477 A Monte Carlo simulation engine based. 

478 

479 Unlike the AgentTypeMonteCarloSimulator HARK.core.AgentType, 

480 this class does make any assumptions about aging or mortality. 

481 It operates only on model information passed in as blocks. 

482 

483 It also does not have read_shocks functionality; 

484 it is a strict subset of the AgentTypeMonteCarloSimulator functionality. 

485 

486 Parameters 

487 ------------ 

488 

489 calibration: Mapping[str, Any] 

490 

491 block : DBlock 

492 Has shocks, dynamics, and rewards 

493 

494 dr: Mapping[str, Callable] 

495 

496 initial: dict 

497 

498 seed : int 

499 A seed for this instance's random number generator. 

500 

501 Attributes 

502 ---------- 

503 agent_count : int 

504 The number of agents of this type to use in simulation. 

505 

506 T_sim : int 

507 The number of periods to simulate. 

508 """ 

509 

510 state_vars = [] 

511 

512 def __init__( 

513 self, calibration, block: DBlock, dr, initial, seed=0, agent_count=1, T_sim=10 

514 ): 

515 super().__init__() 

516 # TODO: pass agent_count in at block level 

517 self._setup_block_state( 

518 calibration, block, dr, initial, seed, agent_count, T_sim 

519 ) 

520 

521 def initialize_sim(self): 

522 """ 

523 Prepares for a new simulation. Resets the internal random number generator, 

524 makes initial states for all agents (using sim_birth), clears histories of tracked variables. 

525 """ 

526 all_agents = self._start_sim() 

527 self._init_newborn_init_history() 

528 self._finish_sim_init(all_agents) 

529 return None 

530 

531 def sim_one_period(self): 

532 """ 

533 Simulates one period for this type. Calls the methods get_mortality(), get_shocks() or 

534 read_shocks, get_states(), get_controls(), and get_poststates(). These should be defined for 

535 AgentType subclasses, except get_mortality (define its components sim_death and sim_birth 

536 instead) and read_shocks. 

537 """ 

538 self._rotate_state_buffers() 

539 

540 shocks_now = draw_shocks( 

541 self.shocks, 

542 np.zeros(self.agent_count), # TODO: stupid hack to remove age calculations. 

543 # this needs a little more thought 

544 ) 

545 

546 pre = self.calibration.copy() # for AgentTypeMC, this is conditional on age 

547 # TODO: generalize indexing into calibration. 

548 

549 pre.update(self.vars_prev) 

550 pre.update(shocks_now) 

551 

552 # Won't work for 3.8: self.parameters | self.vars_prev | shocks_now 

553 

554 dr = self.dr # AgentTypeMC chooses rule by age; 

555 # that generalizes to age as a DR argument? 

556 

557 post = simulate_dynamics(self.dynamics, pre, dr) 

558 

559 for r in self.block.reward: 

560 post[r] = apply_fun_to_vals(self.block.reward[r], post) 

561 

562 self.vars_now = post 

563 

564 def sim_birth(self, which_agents): 

565 """ 

566 Makes new agents for the simulation. Takes a boolean array as an input, indicating which 

567 agent indices are to be "born". Does nothing by default, must be overwritten by a subclass. 

568 

569 Parameters 

570 ---------- 

571 which_agents : np.array(Bool) 

572 Boolean array of size self.agent_count indicating which agents should be "born". 

573 

574 Returns 

575 ------- 

576 None 

577 """ 

578 

579 initial_vals = draw_shocks(self.initial, np.zeros(which_agents.sum())) 

580 

581 self._assign_initial_vals(which_agents, initial_vals)