Coverage for HARK/simulation/monte_carlo.py: 97%

219 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-02 05:14 +0000

1""" 

2Functions to support Monte Carlo simulation of models. 

3""" 

4 

5from copy import copy 

6from typing import Mapping, Sequence 

7 

8import numpy as np 

9 

10from HARK.distributions import ( 

11 Distribution, 

12 IndexDistribution, 

13) 

14from HARK.model import Aggregate 

15from HARK.model import DBlock 

16from HARK.model import construct_shocks, simulate_dynamics 

17 

18from HARK.utilities import apply_fun_to_vals 

19 

20 

21def draw_shocks(shocks: Mapping[str, Distribution], conditions: Sequence[int]): 

22 """ 

23 Draw from each shock distribution values, subject to given conditions. 

24 

25 Parameters 

26 ------------ 

27 shocks Mapping[str, Distribution] 

28 A dictionary-like mapping from shock names to distributions from which to draw 

29 

30 conditions: Sequence[int] 

31 An array of conditions, one for each agent. 

32 Typically these will be agent ages. 

33 # TODO: generalize this to wider range of inputs. 

34 

35 Parameters 

36 ------------ 

37 draws : Mapping[str, Sequence] 

38 A mapping from shock names to drawn shock values. 

39 """ 

40 draws = {} 

41 

42 for shock_var in shocks: 

43 shock = shocks[shock_var] 

44 

45 if isinstance(shock, (int, float)): 

46 draws[shock_var] = np.ones(len(conditions)) * shock 

47 elif isinstance(shock, Aggregate): 

48 draws[shock_var] = shock.dist.draw(1)[0] 

49 elif isinstance(shock, IndexDistribution): 

50 draws[shock_var] = shock.draw(conditions) 

51 else: 

52 draws[shock_var] = shock.draw(len(conditions)) 

53 # this is hacky if there are no conditions. 

54 

55 return draws 

56 

57 

58def calibration_by_age(ages, calibration): 

59 """ 

60 Returns calibration for this model, but with vectorized 

61 values which map age-varying values to agent ages. 

62 

63 Parameters 

64 ---------- 

65 ages: np.array 

66 An array of agent ages. 

67 

68 calibration: dict 

69 A calibration dictionary 

70 

71 Returns 

72 -------- 

73 aged_calibration: dict 

74 A dictionary of parameter values. 

75 If a parameter is age-varying, the value is a vector 

76 corresponding to the values for each input age. 

77 """ 

78 

79 def aged_param(ages, p_value): 

80 if isinstance(p_value, (float, int)) or callable(p_value): 

81 return p_value 

82 elif isinstance(p_value, list) and len(p_value) > 1: 

83 pv_array = np.array(p_value) 

84 

85 return np.apply_along_axis(lambda a: pv_array[a], 0, ages) 

86 else: 

87 return np.empty(ages.size) 

88 

89 return {p: aged_param(ages, calibration[p]) for p in calibration} 

90 

91 

92class Simulator: 

93 pass 

94 

95 

96class AgentTypeMonteCarloSimulator(Simulator): 

97 """ 

98 A Monte Carlo simulation engine based on the HARK.core.AgentType framework. 

99 

100 Unlike HARK.core.AgentType, this class does not do any model solving, 

101 and depends on dynamic equations, shocks, and decision rules paased into it. 

102 

103 The purpose of this class is to provide a way to simulate models without 

104 relying on inheritance from the AgentType class. 

105 

106 This simulator makes assumptions about population birth and mortality which 

107 are not generic. All agents are replaced with newborns when they expire. 

108 

109 Parameters 

110 ------------ 

111 

112 calibration: Mapping[str, Any] 

113 

114 block : DBlock 

115 Has shocks, dynamics, and rewards 

116 

117 dr: Mapping[str, Callable] 

118 

119 initial: dict 

120 

121 seed : int 

122 A seed for this instance's random number generator. 

123 

124 Attributes 

125 ---------- 

126 agent_count : int 

127 The number of agents of this type to use in simulation. 

128 

129 T_sim : int 

130 The number of periods to simulate. 

131 """ 

132 

133 state_vars = [] 

134 

135 def __init__( 

136 self, calibration, block: DBlock, dr, initial, seed=0, agent_count=1, T_sim=10 

137 ): 

138 super().__init__() 

139 

140 self.calibration = calibration 

141 self.block = block 

142 

143 # shocks are exogenous (but for age) but can depend on calibration 

144 raw_shocks = block.get_shocks() 

145 self.shocks = construct_shocks(raw_shocks, calibration) 

146 

147 self.dynamics = block.get_dynamics() 

148 self.dr = dr 

149 self.initial = initial 

150 

151 self.seed = seed # NOQA 

152 self.agent_count = agent_count 

153 self.T_sim = T_sim 

154 

155 # changes here from HARK.core.AgentType 

156 self.vars = block.get_vars() 

157 

158 self.vars_now = {v: None for v in self.vars} 

159 self.vars_prev = self.vars_now.copy() 

160 

161 self.read_shocks = False # NOQA 

162 self.shock_history = {} 

163 self.newborn_init_history = {} 

164 self.history = {} 

165 

166 self.reset_rng() # NOQA 

167 

168 def reset_rng(self): 

169 """ 

170 Reset the random number generator for this type. 

171 """ 

172 self.RNG = np.random.default_rng(self.seed) 

173 

174 def initialize_sim(self): 

175 """ 

176 Prepares for a new simulation. Resets the internal random number generator, 

177 makes initial states for all agents (using sim_birth), clears histories of tracked variables. 

178 """ 

179 if self.T_sim <= 0: 

180 raise Exception( 

181 "T_sim represents the largest number of observations " 

182 + "that can be simulated for an agent, and must be a positive number." 

183 ) 

184 

185 self.reset_rng() 

186 self.t_sim = 0 

187 all_agents = np.ones(self.agent_count, dtype=bool) 

188 blank_array = np.empty(self.agent_count) 

189 blank_array[:] = np.nan 

190 for var in self.vars: 

191 if self.vars_now[var] is None: 

192 self.vars_now[var] = copy(blank_array) 

193 

194 self.t_age = np.zeros( 

195 self.agent_count, dtype=int 

196 ) # Number of periods since agent entry 

197 self.t_cycle = np.zeros( 

198 self.agent_count, dtype=int 

199 ) # Which cycle period each agent is on 

200 

201 # Get recorded newborn conditions or initialize blank history. 

202 if self.read_shocks and bool(self.newborn_init_history): 

203 for init_var_name in self.initial: 

204 self.vars_now[init_var_name] = self.newborn_init_history[init_var_name][ 

205 self.t_sim, : 

206 ] 

207 else: 

208 for var_name in self.initial: 

209 self.newborn_init_history[var_name] = ( 

210 np.zeros((self.T_sim, self.agent_count)) + np.nan 

211 ) 

212 

213 self.sim_birth(all_agents) 

214 

215 self.clear_history() 

216 return None 

217 

218 def sim_one_period(self): 

219 """ 

220 Simulates one period for this type. Calls the methods get_mortality(), get_shocks() or 

221 read_shocks, get_states(), get_controls(), and get_poststates(). These should be defined for 

222 AgentType subclasses, except get_mortality (define its components sim_death and sim_birth 

223 instead) and read_shocks. 

224 """ 

225 # Mortality adjusts the agent population 

226 self.get_mortality() # Replace some agents with "newborns" 

227 

228 # state_{t-1} 

229 for var in self.vars: 

230 self.vars_prev[var] = self.vars_now[var] 

231 

232 if isinstance(self.vars_now[var], np.ndarray): 

233 self.vars_now[var] = np.empty(self.agent_count) 

234 self.vars_now[var][:] = np.nan 

235 else: 

236 # Probably an aggregate variable. It may be getting set by the Market. 

237 pass 

238 

239 shocks_now = {} 

240 

241 if self.read_shocks: # If shock histories have been pre-specified, use those 

242 for var_name in self.shocks: 

243 shocks_now[var_name] = self.shock_history[var_name][self.t_sim, :] 

244 else: 

245 ### BIG CHANGES HERE from HARK.core.AgentType 

246 shocks_now = draw_shocks(self.shocks, self.t_age) 

247 

248 pre = calibration_by_age(self.t_age, self.calibration) 

249 

250 pre.update(self.vars_prev) 

251 pre.update(shocks_now) 

252 # Won't work for 3.8: self.parameters | self.vars_prev | shocks_now 

253 

254 # Age-varying decision rules captured here 

255 dr = calibration_by_age(self.t_age, self.dr) 

256 

257 post = simulate_dynamics(self.dynamics, pre, dr) 

258 

259 self.vars_now = post 

260 ### BIG CHANGES HERE 

261 

262 # Advance time for all agents 

263 self.t_age = self.t_age + 1 # Age all consumers by one period 

264 

265 # What will we do with cycles? 

266 # self.t_cycle = self.t_cycle + 1 # Age all consumers within their cycle 

267 # self.t_cycle[ 

268 # self.t_cycle == self.T_cycle 

269 # ] = 0 # Resetting to zero for those who have reached the end 

270 

271 def make_shock_history(self): 

272 """ 

273 Makes a pre-specified history of shocks for the simulation. Shock variables should be named 

274 in self.shock, a mapping from shock names to distributions. This method runs a subset 

275 of the standard simulation loop by simulating only mortality and shocks; each variable named 

276 in shocks is stored in a T_sim x agent_count array in history dictionary self.history[X]. 

277 Automatically sets self.read_shocks to True so that these pre-specified shocks are used for 

278 all subsequent calls to simulate(). 

279 

280 Returns 

281 ------- 

282 shock_history: dict 

283 The subset of simulation history that are the shocks for each agent and time. 

284 """ 

285 # Re-initialize the simulation 

286 self.initialize_sim() 

287 self.simulate() 

288 

289 for shock_name in self.shocks: 

290 self.shock_history[shock_name] = self.history[shock_name] 

291 

292 # Flag that shocks can be read rather than simulated 

293 self.read_shocks = True 

294 self.clear_history() 

295 

296 return self.shock_history 

297 

298 def get_mortality(self): 

299 """ 

300 Simulates mortality or agent turnover. 

301 Agents die when their states `live` is less than or equal to zero. 

302 """ 

303 who_dies = self.vars_now["live"] <= 0 

304 

305 self.sim_birth(who_dies) 

306 

307 self.who_dies = who_dies 

308 return None 

309 

310 def sim_birth(self, which_agents): 

311 """ 

312 Makes new agents for the simulation. Takes a boolean array as an input, indicating which 

313 agent indices are to be "born". Does nothing by default, must be overwritten by a subclass. 

314 

315 Parameters 

316 ---------- 

317 which_agents : np.array(Bool) 

318 Boolean array of size self.agent_count indicating which agents should be "born". 

319 

320 Returns 

321 ------- 

322 None 

323 """ 

324 if self.read_shocks: 

325 t = self.t_sim 

326 initial_vals = { 

327 init_var: self.newborn_init_history[init_var][t, which_agents] 

328 for init_var in self.initial 

329 } 

330 

331 else: 

332 initial_vals = draw_shocks(self.initial, np.zeros(which_agents.sum())) 

333 

334 if np.sum(which_agents) > 0: 

335 for varn in initial_vals: 

336 self.vars_now[varn][which_agents] = initial_vals[varn] 

337 self.newborn_init_history[varn][self.t_sim, which_agents] = ( 

338 initial_vals[varn] 

339 ) 

340 

341 self.t_age[which_agents] = 0 

342 self.t_cycle[which_agents] = 0 

343 

344 def simulate(self, sim_periods=None): 

345 """ 

346 Simulates this agent type for a given number of periods. Defaults to 

347 self.T_sim if no input. 

348 Records histories of attributes named in self.track_vars in 

349 self.history[varname]. 

350 

351 Parameters 

352 ---------- 

353 None 

354 

355 Returns 

356 ------- 

357 history : dict 

358 The history tracked during the simulation. 

359 """ 

360 if not hasattr(self, "t_sim"): 

361 raise Exception( 

362 "It seems that the simulation variables were not initialize before calling " 

363 + "simulate(). Call initialize_sim() to initialize the variables before calling simulate() again." 

364 ) 

365 if sim_periods is not None and self.T_sim < sim_periods: 

366 raise Exception( 

367 "To simulate, sim_periods has to be larger than the maximum data set size " 

368 + "T_sim. Either increase the attribute T_sim of this agent type instance " 

369 + "and call the initialize_sim() method again, or set sim_periods <= T_sim." 

370 ) 

371 

372 # Ignore floating point "errors". Numpy calls it "errors", but really it's excep- 

373 # tions with well-defined answers such as 1.0/0.0 that is np.inf, -1.0/0.0 that is 

374 # -np.inf, np.inf/np.inf is np.nan and so on. 

375 with np.errstate( 

376 divide="ignore", over="ignore", under="ignore", invalid="ignore" 

377 ): 

378 if sim_periods is None: 

379 sim_periods = self.T_sim 

380 

381 for t in range(sim_periods): 

382 self.sim_one_period() 

383 

384 # track all the vars -- shocks and dynamics 

385 for var_name in self.vars: 

386 self.history[var_name][self.t_sim, :] = self.vars_now[var_name] 

387 

388 self.t_sim += 1 

389 

390 return self.history 

391 

392 def clear_history(self): 

393 """ 

394 Clears the histories. 

395 """ 

396 for var_name in self.vars: 

397 self.history[var_name] = np.empty((self.T_sim, self.agent_count)) 

398 self.history[var_name].fill(np.nan) 

399 

400 

401class MonteCarloSimulator(Simulator): 

402 """ 

403 A Monte Carlo simulation engine based. 

404 

405 Unlike the AgentTypeMonteCarloSimulator HARK.core.AgentType, 

406 this class does make any assumptions about aging or mortality. 

407 It operates only on model information passed in as blocks. 

408 

409 It also does not have read_shocks functionality; 

410 it is a strict subset of the AgentTypeMonteCarloSimulator functionality. 

411 

412 Parameters 

413 ------------ 

414 

415 calibration: Mapping[str, Any] 

416 

417 block : DBlock 

418 Has shocks, dynamics, and rewards 

419 

420 dr: Mapping[str, Callable] 

421 

422 initial: dict 

423 

424 seed : int 

425 A seed for this instance's random number generator. 

426 

427 Attributes 

428 ---------- 

429 agent_count : int 

430 The number of agents of this type to use in simulation. 

431 

432 T_sim : int 

433 The number of periods to simulate. 

434 """ 

435 

436 state_vars = [] 

437 

438 def __init__( 

439 self, calibration, block: DBlock, dr, initial, seed=0, agent_count=1, T_sim=10 

440 ): 

441 super().__init__() 

442 

443 self.calibration = calibration 

444 self.block = block 

445 

446 # shocks are exogenous (but for age) but can depend on calibration 

447 raw_shocks = block.get_shocks() 

448 self.shocks = construct_shocks(raw_shocks, calibration) 

449 

450 self.dynamics = block.get_dynamics() 

451 self.dr = dr 

452 self.initial = initial 

453 

454 self.seed = seed # NOQA 

455 self.agent_count = agent_count # TODO: pass this in at block level 

456 self.T_sim = T_sim 

457 

458 # changes here from HARK.core.AgentType 

459 self.vars = block.get_vars() 

460 

461 self.vars_now = {v: None for v in self.vars} 

462 self.vars_prev = self.vars_now.copy() 

463 

464 self.shock_history = {} 

465 self.newborn_init_history = {} 

466 self.history = {} 

467 

468 self.reset_rng() # NOQA 

469 

470 def reset_rng(self): 

471 """ 

472 Reset the random number generator for this type. 

473 """ 

474 self.RNG = np.random.default_rng(self.seed) 

475 

476 def initialize_sim(self): 

477 """ 

478 Prepares for a new simulation. Resets the internal random number generator, 

479 makes initial states for all agents (using sim_birth), clears histories of tracked variables. 

480 """ 

481 if self.T_sim <= 0: 

482 raise Exception( 

483 "T_sim represents the largest number of observations " 

484 + "that can be simulated for an agent, and must be a positive number." 

485 ) 

486 

487 self.reset_rng() 

488 self.t_sim = 0 

489 all_agents = np.ones(self.agent_count, dtype=bool) 

490 blank_array = np.empty(self.agent_count) 

491 blank_array[:] = np.nan 

492 for var in self.vars: 

493 if self.vars_now[var] is None: 

494 self.vars_now[var] = copy(blank_array) 

495 

496 self.t_cycle = np.zeros( 

497 self.agent_count, dtype=int 

498 ) # Which cycle period each agent is on 

499 

500 for var_name in self.initial: 

501 self.newborn_init_history[var_name] = ( 

502 np.zeros((self.T_sim, self.agent_count)) + np.nan 

503 ) 

504 

505 self.sim_birth(all_agents) 

506 

507 self.clear_history() 

508 return None 

509 

510 def sim_one_period(self): 

511 """ 

512 Simulates one period for this type. Calls the methods get_mortality(), get_shocks() or 

513 read_shocks, get_states(), get_controls(), and get_poststates(). These should be defined for 

514 AgentType subclasses, except get_mortality (define its components sim_death and sim_birth 

515 instead) and read_shocks. 

516 """ 

517 

518 # state_{t-1} 

519 for var in self.vars: 

520 self.vars_prev[var] = self.vars_now[var] 

521 

522 if isinstance(self.vars_now[var], np.ndarray): 

523 self.vars_now[var] = np.empty(self.agent_count) 

524 self.vars_now[var][:] = np.nan 

525 else: 

526 # Probably an aggregate variable. It may be getting set by the Market. 

527 pass 

528 

529 shocks_now = {} 

530 

531 shocks_now = draw_shocks( 

532 self.shocks, 

533 np.zeros(self.agent_count), # TODO: stupid hack to remove age calculations. 

534 # this needs a little more thought 

535 ) 

536 

537 pre = self.calibration.copy() # for AgentTypeMC, this is conditional on age 

538 # TODO: generalize indexing into calibration. 

539 

540 pre.update(self.vars_prev) 

541 pre.update(shocks_now) 

542 

543 # Won't work for 3.8: self.parameters | self.vars_prev | shocks_now 

544 

545 dr = self.dr # AgentTypeMC chooses rule by age; 

546 # that generalizes to age as a DR argument? 

547 

548 post = simulate_dynamics(self.dynamics, pre, dr) 

549 

550 for r in self.block.reward: 

551 post[r] = apply_fun_to_vals(self.block.reward[r], post) 

552 

553 self.vars_now = post 

554 

555 def sim_birth(self, which_agents): 

556 """ 

557 Makes new agents for the simulation. Takes a boolean array as an input, indicating which 

558 agent indices are to be "born". Does nothing by default, must be overwritten by a subclass. 

559 

560 Parameters 

561 ---------- 

562 which_agents : np.array(Bool) 

563 Boolean array of size self.agent_count indicating which agents should be "born". 

564 

565 Returns 

566 ------- 

567 None 

568 """ 

569 

570 initial_vals = draw_shocks(self.initial, np.zeros(which_agents.sum())) 

571 

572 if np.sum(which_agents) > 0: 

573 for varn in initial_vals: 

574 self.vars_now[varn][which_agents] = initial_vals[varn] 

575 self.newborn_init_history[varn][self.t_sim, which_agents] = ( 

576 initial_vals[varn] 

577 ) 

578 

579 def simulate(self, sim_periods=None): 

580 """ 

581 Simulates this agent type for a given number of periods. Defaults to 

582 self.T_sim if no input. 

583 

584 Records histories of attributes named in self.track_vars in 

585 self.history[varname]. 

586 

587 Parameters 

588 ---------- 

589 sim_periods : int 

590 Number of periods to simulate. 

591 

592 Returns 

593 ------- 

594 history : dict 

595 The history tracked during the simulation. 

596 """ 

597 if not hasattr(self, "t_sim"): 

598 raise Exception( 

599 "It seems that the simulation variables were not initialize before calling " 

600 + "simulate(). Call initialize_sim() to initialize the variables before calling simulate() again." 

601 ) 

602 if sim_periods is not None and self.T_sim < sim_periods: 

603 raise Exception( 

604 "To simulate, sim_periods has to be larger than the maximum data set size " 

605 + "T_sim. Either increase the attribute T_sim of this agent type instance " 

606 + "and call the initialize_sim() method again, or set sim_periods <= T_sim." 

607 ) 

608 

609 # Ignore floating point "errors". Numpy calls it "errors", but really it's excep- 

610 # tions with well-defined answers such as 1.0/0.0 that is np.inf, -1.0/0.0 that is 

611 # -np.inf, np.inf/np.inf is np.nan and so on. 

612 with np.errstate( 

613 divide="ignore", over="ignore", under="ignore", invalid="ignore" 

614 ): 

615 if sim_periods is None: 

616 sim_periods = self.T_sim 

617 

618 for t in range(sim_periods): 

619 self.sim_one_period() 

620 

621 # track all the vars -- shocks and dynamics 

622 for var_name in self.vars: 

623 self.history[var_name][self.t_sim, :] = self.vars_now[var_name] 

624 

625 self.t_sim += 1 

626 

627 return self.history 

628 

629 def clear_history(self): 

630 """ 

631 Clears the histories. 

632 """ 

633 for var_name in self.vars: 

634 self.history[var_name] = np.empty((self.T_sim, self.agent_count)) 

635 self.history[var_name].fill(np.nan)