Coverage for HARK / simulation / monte_carlo.py: 97%
197 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 05:31 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 05:31 +0000
1"""
2Functions to support Monte Carlo simulation of models.
3"""
5from copy import copy
6from typing import Mapping, Sequence
8import numpy as np
10from HARK.distributions import (
11 Distribution,
12 IndexDistribution,
13)
14from HARK.model import Aggregate
15from HARK.model import DBlock
16from HARK.model import construct_shocks, simulate_dynamics
18from HARK.utilities import apply_fun_to_vals
21def draw_shocks(shocks: Mapping[str, Distribution], conditions: Sequence[int]):
22 """
23 Draw from each shock distribution values, subject to given conditions.
25 Parameters
26 ----------
27 shocks : Mapping[str, Distribution]
28 A dictionary-like mapping from shock names to distributions from which to draw.
29 conditions : Sequence[int]
30 An array of conditions, one for each agent.
31 Typically these will be agent ages.
33 Returns
34 -------
35 draws : Mapping[str, Sequence]
36 A mapping from shock names to drawn shock values.
37 """
38 draws = {}
40 for shock_var in shocks:
41 shock = shocks[shock_var]
43 if isinstance(shock, (int, float)):
44 draws[shock_var] = np.ones(len(conditions)) * shock
45 elif isinstance(shock, Aggregate):
46 draws[shock_var] = shock.dist.draw(1)[0]
47 elif isinstance(shock, IndexDistribution):
48 draws[shock_var] = shock.draw(conditions)
49 else:
50 draws[shock_var] = shock.draw(len(conditions))
51 # this is hacky if there are no conditions.
53 return draws
56def calibration_by_age(ages, calibration):
57 """
58 Returns calibration for this model, but with vectorized
59 values which map age-varying values to agent ages.
61 Parameters
62 ----------
63 ages: np.array
64 An array of agent ages.
66 calibration: dict
67 A calibration dictionary
69 Returns
70 --------
71 aged_calibration: dict
72 A dictionary of parameter values.
73 If a parameter is age-varying, the value is a vector
74 corresponding to the values for each input age.
75 """
77 def aged_param(ages, p_value):
78 if isinstance(p_value, (float, int)) or callable(p_value):
79 return p_value
80 elif isinstance(p_value, list) and len(p_value) > 1:
81 pv_array = np.array(p_value)
83 return np.apply_along_axis(lambda a: pv_array[a], 0, ages)
84 else:
85 return np.empty(ages.size)
87 return {p: aged_param(ages, calibration[p]) for p in calibration}
90class Simulator:
91 """
92 Base class for Monte Carlo simulators.
94 Subclasses must set the following instance attributes before calling any
95 of the methods defined here:
97 self.vars -- list of variable names tracked in the simulation
98 self.T_sim -- int, number of periods to simulate
99 self.agent_count -- int, number of agents
100 self.seed -- int, random seed
101 self.history -- dict, populated by clear_history / simulate
102 self.vars_now -- dict, current-period variable values per agent
103 self.newborn_init_history -- dict, initial values for newborn agents
104 self.t_sim -- int, current simulation time step
106 Subclasses must also implement ``sim_one_period``.
107 """
109 def reset_rng(self):
110 """
111 Reset the random number generator for this type.
112 """
113 self.RNG = np.random.default_rng(self.seed)
115 def clear_history(self):
116 """
117 Clears the histories.
118 """
119 for var_name in self.vars:
120 self.history[var_name] = np.empty((self.T_sim, self.agent_count))
121 self.history[var_name].fill(np.nan)
123 def _assign_initial_vals(self, which_agents, initial_vals):
124 """
125 Assign drawn initial values to the agents flagged by ``which_agents``
126 and record those values in ``newborn_init_history``.
128 This helper captures the common assignment pattern shared by every
129 ``sim_birth`` implementation: loop over the variables in
130 ``initial_vals``, write them into ``self.vars_now``, and store them
131 in ``self.newborn_init_history`` at the current simulation step.
133 Parameters
134 ----------
135 which_agents : np.array(bool)
136 Boolean mask of length ``self.agent_count`` marking the agents
137 that are being born this period.
139 initial_vals : dict
140 Mapping from variable name to an array of drawn initial values,
141 one entry per newly-born agent (i.e. length equal to
142 ``which_agents.sum()``).
144 Returns
145 -------
146 None
147 """
148 if np.sum(which_agents) > 0:
149 for varn in initial_vals:
150 self.vars_now[varn][which_agents] = initial_vals[varn]
151 self.newborn_init_history[varn][self.t_sim, which_agents] = (
152 initial_vals[varn]
153 )
155 def simulate(self, sim_periods=None):
156 """
157 Simulate for a given number of periods, defaulting to ``self.T_sim``.
159 Records histories of attributes named in ``self.vars`` in
160 ``self.history[var_name]``.
162 Parameters
163 ----------
164 sim_periods : int, optional
165 Number of periods to simulate. If ``None``, simulate for
166 ``self.T_sim`` periods.
168 Returns
169 -------
170 history : dict
171 The history tracked during the simulation.
172 """
173 if not hasattr(self, "t_sim"):
174 raise RuntimeError(
175 "Simulation variables were not initialized. "
176 "Call initialize_sim() before simulate()."
177 )
178 if sim_periods is not None and self.T_sim < sim_periods:
179 raise ValueError(
180 "sim_periods ("
181 + str(sim_periods)
182 + ") exceeds T_sim ("
183 + str(self.T_sim)
184 + "). Either increase T_sim and call "
185 "initialize_sim() again, or set sim_periods <= T_sim."
186 )
188 # Ignore floating point "errors". Numpy calls it "errors", but really it's excep-
189 # tions with well-defined answers such as 1.0/0.0 that is np.inf, -1.0/0.0 that is
190 # -np.inf, np.inf/np.inf is np.nan and so on.
191 with np.errstate(
192 divide="ignore", over="ignore", under="ignore", invalid="ignore"
193 ):
194 if sim_periods is None:
195 sim_periods = self.T_sim
197 for t in range(sim_periods):
198 self.sim_one_period()
200 # track all the vars -- shocks and dynamics
201 for var_name in self.vars:
202 self.history[var_name][self.t_sim, :] = self.vars_now[var_name]
204 self.t_sim += 1
206 return self.history
209class AgentTypeMonteCarloSimulator(Simulator):
210 """
211 A Monte Carlo simulation engine based on the HARK.core.AgentType framework.
213 Unlike HARK.core.AgentType, this class does not do any model solving,
214 and depends on dynamic equations, shocks, and decision rules passed into it.
216 The purpose of this class is to provide a way to simulate models without
217 relying on inheritance from the AgentType class.
219 This simulator makes assumptions about population birth and mortality which
220 are not generic. All agents are replaced with newborns when they expire.
222 Parameters
223 ------------
225 calibration: Mapping[str, Any]
227 block : DBlock
228 Has shocks, dynamics, and rewards
230 dr: Mapping[str, Callable]
232 initial: dict
234 seed : int
235 A seed for this instance's random number generator.
237 Attributes
238 ----------
239 agent_count : int
240 The number of agents of this type to use in simulation.
242 T_sim : int
243 The number of periods to simulate.
244 """
246 state_vars = []
248 def __init__(
249 self, calibration, block: DBlock, dr, initial, seed=0, agent_count=1, T_sim=10
250 ):
251 super().__init__()
253 self.calibration = calibration
254 self.block = block
256 # shocks are exogenous (but for age) but can depend on calibration
257 raw_shocks = block.get_shocks()
258 self.shocks = construct_shocks(raw_shocks, calibration)
260 self.dynamics = block.get_dynamics()
261 self.dr = dr
262 self.initial = initial
264 self.seed = seed # NOQA
265 self.agent_count = agent_count
266 self.T_sim = T_sim
268 # changes here from HARK.core.AgentType
269 self.vars = block.get_vars()
271 self.vars_now = {v: None for v in self.vars}
272 self.vars_prev = self.vars_now.copy()
274 self.read_shocks = False # NOQA
275 self.shock_history = {}
276 self.newborn_init_history = {}
277 self.history = {}
279 self.reset_rng() # NOQA
281 def initialize_sim(self):
282 """
283 Prepares for a new simulation. Resets the internal random number generator,
284 makes initial states for all agents (using sim_birth), clears histories of tracked variables.
285 """
286 if self.T_sim <= 0:
287 raise Exception(
288 "T_sim represents the largest number of observations "
289 + "that can be simulated for an agent, and must be a positive number."
290 )
292 self.reset_rng()
293 self.t_sim = 0
294 all_agents = np.ones(self.agent_count, dtype=bool)
295 blank_array = np.empty(self.agent_count)
296 blank_array[:] = np.nan
297 for var in self.vars:
298 if self.vars_now[var] is None:
299 self.vars_now[var] = copy(blank_array)
301 self.t_age = np.zeros(
302 self.agent_count, dtype=int
303 ) # Number of periods since agent entry
304 self.t_cycle = np.zeros(
305 self.agent_count, dtype=int
306 ) # Which cycle period each agent is on
308 # Get recorded newborn conditions or initialize blank history.
309 if self.read_shocks and bool(self.newborn_init_history):
310 for init_var_name in self.initial:
311 self.vars_now[init_var_name] = self.newborn_init_history[init_var_name][
312 self.t_sim, :
313 ]
314 else:
315 for var_name in self.initial:
316 self.newborn_init_history[var_name] = (
317 np.zeros((self.T_sim, self.agent_count)) + np.nan
318 )
320 self.sim_birth(all_agents)
322 self.clear_history()
323 return None
325 def sim_one_period(self):
326 """
327 Simulates one period for this type. Calls the methods get_mortality(), get_shocks() or
328 read_shocks, get_states(), get_controls(), and get_poststates(). These should be defined for
329 AgentType subclasses, except get_mortality (define its components sim_death and sim_birth
330 instead) and read_shocks.
331 """
332 # Mortality adjusts the agent population
333 self.get_mortality() # Replace some agents with "newborns"
335 # state_{t-1}
336 for var in self.vars:
337 self.vars_prev[var] = self.vars_now[var]
339 if isinstance(self.vars_now[var], np.ndarray):
340 self.vars_now[var] = np.empty(self.agent_count)
341 self.vars_now[var][:] = np.nan
342 else:
343 # Probably an aggregate variable. It may be getting set by the Market.
344 pass
346 shocks_now = {}
348 if self.read_shocks: # If shock histories have been pre-specified, use those
349 for var_name in self.shocks:
350 shocks_now[var_name] = self.shock_history[var_name][self.t_sim, :]
351 else:
352 shocks_now = draw_shocks(self.shocks, self.t_age)
354 pre = calibration_by_age(self.t_age, self.calibration)
356 pre.update(self.vars_prev)
357 pre.update(shocks_now)
358 # Won't work for 3.8: self.parameters | self.vars_prev | shocks_now
360 # Age-varying decision rules captured here
361 dr = calibration_by_age(self.t_age, self.dr)
363 post = simulate_dynamics(self.dynamics, pre, dr)
365 self.vars_now = post
367 # Advance time for all agents
368 self.t_age = self.t_age + 1 # Age all consumers by one period
370 # What will we do with cycles?
371 # self.t_cycle = self.t_cycle + 1 # Age all consumers within their cycle
372 # self.t_cycle[
373 # self.t_cycle == self.T_cycle
374 # ] = 0 # Resetting to zero for those who have reached the end
376 def make_shock_history(self):
377 """
378 Makes a pre-specified history of shocks for the simulation. Shock variables should be named
379 in self.shock, a mapping from shock names to distributions. This method runs a subset
380 of the standard simulation loop by simulating only mortality and shocks; each variable named
381 in shocks is stored in a T_sim x agent_count array in history dictionary self.history[X].
382 Automatically sets self.read_shocks to True so that these pre-specified shocks are used for
383 all subsequent calls to simulate().
385 Returns
386 -------
387 shock_history: dict
388 The subset of simulation history that are the shocks for each agent and time.
389 """
390 # Re-initialize the simulation
391 self.initialize_sim()
392 self.simulate()
394 for shock_name in self.shocks:
395 self.shock_history[shock_name] = self.history[shock_name]
397 # Flag that shocks can be read rather than simulated
398 self.read_shocks = True
399 self.clear_history()
401 return self.shock_history
403 def get_mortality(self):
404 """
405 Simulates mortality or agent turnover.
406 Agents die when their states `live` is less than or equal to zero.
407 """
408 who_dies = self.vars_now["live"] <= 0
410 self.sim_birth(who_dies)
412 self.who_dies = who_dies
413 return None
415 def sim_birth(self, which_agents):
416 """
417 Makes new agents for the simulation. Takes a boolean array as an input, indicating which
418 agent indices are to be "born". Does nothing by default, must be overwritten by a subclass.
420 Parameters
421 ----------
422 which_agents : np.array(Bool)
423 Boolean array of size self.agent_count indicating which agents should be "born".
425 Returns
426 -------
427 None
428 """
429 if self.read_shocks:
430 t = self.t_sim
431 initial_vals = {
432 init_var: self.newborn_init_history[init_var][t, which_agents]
433 for init_var in self.initial
434 }
436 else:
437 initial_vals = draw_shocks(self.initial, np.zeros(which_agents.sum()))
439 self._assign_initial_vals(which_agents, initial_vals)
441 self.t_age[which_agents] = 0
442 self.t_cycle[which_agents] = 0
445class MonteCarloSimulator(Simulator):
446 """
447 A Monte Carlo simulation engine based.
449 Unlike the AgentTypeMonteCarloSimulator HARK.core.AgentType,
450 this class does make any assumptions about aging or mortality.
451 It operates only on model information passed in as blocks.
453 It also does not have read_shocks functionality;
454 it is a strict subset of the AgentTypeMonteCarloSimulator functionality.
456 Parameters
457 ------------
459 calibration: Mapping[str, Any]
461 block : DBlock
462 Has shocks, dynamics, and rewards
464 dr: Mapping[str, Callable]
466 initial: dict
468 seed : int
469 A seed for this instance's random number generator.
471 Attributes
472 ----------
473 agent_count : int
474 The number of agents of this type to use in simulation.
476 T_sim : int
477 The number of periods to simulate.
478 """
480 state_vars = []
482 def __init__(
483 self, calibration, block: DBlock, dr, initial, seed=0, agent_count=1, T_sim=10
484 ):
485 super().__init__()
487 self.calibration = calibration
488 self.block = block
490 # shocks are exogenous (but for age) but can depend on calibration
491 raw_shocks = block.get_shocks()
492 self.shocks = construct_shocks(raw_shocks, calibration)
494 self.dynamics = block.get_dynamics()
495 self.dr = dr
496 self.initial = initial
498 self.seed = seed # NOQA
499 self.agent_count = agent_count # TODO: pass this in at block level
500 self.T_sim = T_sim
502 # changes here from HARK.core.AgentType
503 self.vars = block.get_vars()
505 self.vars_now = {v: None for v in self.vars}
506 self.vars_prev = self.vars_now.copy()
508 self.shock_history = {}
509 self.newborn_init_history = {}
510 self.history = {}
512 self.reset_rng() # NOQA
514 def initialize_sim(self):
515 """
516 Prepares for a new simulation. Resets the internal random number generator,
517 makes initial states for all agents (using sim_birth), clears histories of tracked variables.
518 """
519 if self.T_sim <= 0:
520 raise Exception(
521 "T_sim represents the largest number of observations "
522 + "that can be simulated for an agent, and must be a positive number."
523 )
525 self.reset_rng()
526 self.t_sim = 0
527 all_agents = np.ones(self.agent_count, dtype=bool)
528 blank_array = np.empty(self.agent_count)
529 blank_array[:] = np.nan
530 for var in self.vars:
531 if self.vars_now[var] is None:
532 self.vars_now[var] = copy(blank_array)
534 self.t_cycle = np.zeros(
535 self.agent_count, dtype=int
536 ) # Which cycle period each agent is on
538 for var_name in self.initial:
539 self.newborn_init_history[var_name] = (
540 np.zeros((self.T_sim, self.agent_count)) + np.nan
541 )
543 self.sim_birth(all_agents)
545 self.clear_history()
546 return None
548 def sim_one_period(self):
549 """
550 Simulates one period for this type. Calls the methods get_mortality(), get_shocks() or
551 read_shocks, get_states(), get_controls(), and get_poststates(). These should be defined for
552 AgentType subclasses, except get_mortality (define its components sim_death and sim_birth
553 instead) and read_shocks.
554 """
556 # state_{t-1}
557 for var in self.vars:
558 self.vars_prev[var] = self.vars_now[var]
560 if isinstance(self.vars_now[var], np.ndarray):
561 self.vars_now[var] = np.empty(self.agent_count)
562 self.vars_now[var][:] = np.nan
563 else:
564 # Probably an aggregate variable. It may be getting set by the Market.
565 pass
567 shocks_now = {}
569 shocks_now = draw_shocks(
570 self.shocks,
571 np.zeros(self.agent_count), # TODO: stupid hack to remove age calculations.
572 # this needs a little more thought
573 )
575 pre = self.calibration.copy() # for AgentTypeMC, this is conditional on age
576 # TODO: generalize indexing into calibration.
578 pre.update(self.vars_prev)
579 pre.update(shocks_now)
581 # Won't work for 3.8: self.parameters | self.vars_prev | shocks_now
583 dr = self.dr # AgentTypeMC chooses rule by age;
584 # that generalizes to age as a DR argument?
586 post = simulate_dynamics(self.dynamics, pre, dr)
588 for r in self.block.reward:
589 post[r] = apply_fun_to_vals(self.block.reward[r], post)
591 self.vars_now = post
593 def sim_birth(self, which_agents):
594 """
595 Makes new agents for the simulation. Takes a boolean array as an input, indicating which
596 agent indices are to be "born". Does nothing by default, must be overwritten by a subclass.
598 Parameters
599 ----------
600 which_agents : np.array(Bool)
601 Boolean array of size self.agent_count indicating which agents should be "born".
603 Returns
604 -------
605 None
606 """
608 initial_vals = draw_shocks(self.initial, np.zeros(which_agents.sum()))
610 self._assign_initial_vals(which_agents, initial_vals)