Coverage for HARK / simulation / monte_carlo.py: 98%
178 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-10 06:19 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-10 06:19 +0000
1"""
2Functions to support Monte Carlo simulation of models.
3"""
5from copy import copy
6from typing import Mapping, Sequence
8import numpy as np
10from HARK.distributions import (
11 Distribution,
12 IndexDistribution,
13)
14from HARK.model import Aggregate
15from HARK.model import DBlock
16from HARK.model import construct_shocks, simulate_dynamics
18from HARK.utilities import apply_fun_to_vals
21def draw_shocks(shocks: Mapping[str, Distribution], conditions: Sequence[int]):
22 """
23 Draw from each shock distribution values, subject to given conditions.
25 Parameters
26 ----------
27 shocks : Mapping[str, Distribution]
28 A dictionary-like mapping from shock names to distributions from which to draw.
29 conditions : Sequence[int]
30 An array of conditions, one for each agent.
31 Typically these will be agent ages.
33 Returns
34 -------
35 draws : Mapping[str, Sequence]
36 A mapping from shock names to drawn shock values.
37 """
38 draws = {}
40 for shock_var in shocks:
41 shock = shocks[shock_var]
43 if isinstance(shock, (int, float)):
44 draws[shock_var] = np.ones(len(conditions)) * shock
45 elif isinstance(shock, Aggregate):
46 draws[shock_var] = shock.dist.draw(1)[0]
47 elif isinstance(shock, IndexDistribution):
48 draws[shock_var] = shock.draw(conditions)
49 else:
50 draws[shock_var] = shock.draw(len(conditions))
51 # this is hacky if there are no conditions.
53 return draws
56def calibration_by_age(ages, calibration):
57 """
58 Returns calibration for this model, but with vectorized
59 values which map age-varying values to agent ages.
61 Parameters
62 ----------
63 ages: np.array
64 An array of agent ages.
66 calibration: dict
67 A calibration dictionary
69 Returns
70 --------
71 aged_calibration: dict
72 A dictionary of parameter values.
73 If a parameter is age-varying, the value is a vector
74 corresponding to the values for each input age.
75 """
77 def aged_param(ages, p_value):
78 if isinstance(p_value, (float, int)) or callable(p_value):
79 return p_value
80 elif isinstance(p_value, list) and len(p_value) > 1:
81 pv_array = np.array(p_value)
83 return np.apply_along_axis(lambda a: pv_array[a], 0, ages)
84 else:
85 return np.empty(ages.size)
87 return {p: aged_param(ages, calibration[p]) for p in calibration}
90class Simulator:
91 """
92 Base class for Monte Carlo simulators.
94 Subclasses must set the following instance attributes before calling any
95 of the methods defined here:
97 self.vars -- list of variable names tracked in the simulation
98 self.T_sim -- int, number of periods to simulate
99 self.agent_count -- int, number of agents
100 self.seed -- int, random seed
101 self.history -- dict, populated by clear_history / simulate
102 self.vars_now -- dict, current-period variable values per agent
103 self.newborn_init_history -- dict, initial values for newborn agents
104 self.t_sim -- int, current simulation time step
106 Subclasses must also implement ``sim_one_period``.
107 """
109 def reset_rng(self):
110 """
111 Reset the random number generator for this type.
112 """
113 self.RNG = np.random.default_rng(self.seed)
115 def clear_history(self):
116 """
117 Clears the histories.
118 """
119 for var_name in self.vars:
120 self.history[var_name] = np.empty((self.T_sim, self.agent_count))
121 self.history[var_name].fill(np.nan)
123 def _setup_block_state(
124 self, calibration, block, dr, initial, seed, agent_count, T_sim
125 ):
126 """
127 Common attribute setup shared by every block-driven Simulator subclass.
129 Stores the calibration/block/dr/initial inputs, builds the shock
130 distribution from the block, sizes ``vars_now``/``vars_prev`` from
131 ``block.get_vars()``, and seeds the simulator's RNG. History dicts
132 are also initialized. Subclasses set their own subclass-specific
133 attributes (e.g. ``read_shocks``) before or after this call.
134 """
135 self.calibration = calibration
136 self.block = block
138 # Shocks are exogenous (but for age) but can depend on calibration.
139 raw_shocks = block.get_shocks()
140 self.shocks = construct_shocks(raw_shocks, calibration)
142 self.dynamics = block.get_dynamics()
143 self.dr = dr
144 self.initial = initial
146 self.seed = seed # NOQA
147 self.agent_count = agent_count
148 self.T_sim = T_sim
150 self.vars = block.get_vars()
151 self.vars_now = {v: None for v in self.vars}
152 self.vars_prev = self.vars_now.copy()
154 self._init_history_dicts()
155 self.reset_rng() # NOQA
157 def _init_newborn_init_history(self):
158 """Allocate ``newborn_init_history`` arrays for each variable in ``self.initial``."""
159 for var_name in self.initial:
160 self.newborn_init_history[var_name] = (
161 np.zeros((self.T_sim, self.agent_count)) + np.nan
162 )
164 def _init_history_dicts(self):
165 """Allocate empty history dicts shared by every Simulator subclass."""
166 self.shock_history = {}
167 self.newborn_init_history = {}
168 self.history = {}
170 def _init_blank_now_vars(self):
171 """Fill any uninitialized ``vars_now`` entries with NaN arrays."""
172 blank_array = np.empty(self.agent_count)
173 blank_array[:] = np.nan
174 for var in self.vars:
175 if self.vars_now[var] is None:
176 self.vars_now[var] = copy(blank_array)
178 def _rotate_state_buffers(self):
179 """Move ``vars_now`` to ``vars_prev`` and reinitialize array buffers to NaN."""
180 for var in self.vars:
181 self.vars_prev[var] = self.vars_now[var]
182 if isinstance(self.vars_now[var], np.ndarray):
183 self.vars_now[var] = np.empty(self.agent_count)
184 self.vars_now[var][:] = np.nan
185 # else: aggregate variable possibly set by a Market - leave alone
187 def _start_sim(self):
188 """Common ``initialize_sim`` prefix: validate, reset RNG, blank buffers,
189 initialize ``t_cycle``, and return the all-agents mask used for sim_birth."""
190 self._validate_T_sim()
191 self.reset_rng()
192 self.t_sim = 0
193 self._init_blank_now_vars()
194 self.t_cycle = np.zeros(self.agent_count, dtype=int)
195 return np.ones(self.agent_count, dtype=bool)
197 def _finish_sim_init(self, all_agents):
198 """Common ``initialize_sim`` suffix: birth all agents and clear history."""
199 self.sim_birth(all_agents)
200 self.clear_history()
202 def _validate_T_sim(self):
203 """Raise if ``T_sim`` was not set to a positive integer."""
204 if self.T_sim <= 0:
205 raise Exception(
206 "T_sim represents the largest number of observations "
207 + "that can be simulated for an agent, and must be a positive number."
208 )
210 def _assign_initial_vals(self, which_agents, initial_vals):
211 """
212 Assign drawn initial values to the agents flagged by ``which_agents``
213 and record those values in ``newborn_init_history``.
215 This helper captures the common assignment pattern shared by every
216 ``sim_birth`` implementation: loop over the variables in
217 ``initial_vals``, write them into ``self.vars_now``, and store them
218 in ``self.newborn_init_history`` at the current simulation step.
220 Parameters
221 ----------
222 which_agents : np.array(bool)
223 Boolean mask of length ``self.agent_count`` marking the agents
224 that are being born this period.
226 initial_vals : dict
227 Mapping from variable name to an array of drawn initial values,
228 one entry per newly-born agent (i.e. length equal to
229 ``which_agents.sum()``).
231 Returns
232 -------
233 None
234 """
235 if np.sum(which_agents) > 0:
236 for varn in initial_vals:
237 self.vars_now[varn][which_agents] = initial_vals[varn]
238 self.newborn_init_history[varn][self.t_sim, which_agents] = (
239 initial_vals[varn]
240 )
242 def simulate(self, sim_periods=None):
243 """
244 Simulate for a given number of periods, defaulting to ``self.T_sim``.
246 Records histories of attributes named in ``self.vars`` in
247 ``self.history[var_name]``.
249 Parameters
250 ----------
251 sim_periods : int, optional
252 Number of periods to simulate. If ``None``, simulate for
253 ``self.T_sim`` periods.
255 Returns
256 -------
257 history : dict
258 The history tracked during the simulation.
259 """
260 if not hasattr(self, "t_sim"):
261 raise RuntimeError(
262 "Simulation variables were not initialized. "
263 "Call initialize_sim() before simulate()."
264 )
265 if sim_periods is not None and self.T_sim < sim_periods:
266 raise ValueError(
267 "sim_periods ("
268 + str(sim_periods)
269 + ") exceeds T_sim ("
270 + str(self.T_sim)
271 + "). Either increase T_sim and call "
272 "initialize_sim() again, or set sim_periods <= T_sim."
273 )
275 # Ignore floating point "errors". Numpy calls it "errors", but really it's excep-
276 # tions with well-defined answers such as 1.0/0.0 that is np.inf, -1.0/0.0 that is
277 # -np.inf, np.inf/np.inf is np.nan and so on.
278 with np.errstate(
279 divide="ignore", over="ignore", under="ignore", invalid="ignore"
280 ):
281 if sim_periods is None:
282 sim_periods = self.T_sim
284 for t in range(sim_periods):
285 self.sim_one_period()
287 # track all the vars -- shocks and dynamics
288 for var_name in self.vars:
289 self.history[var_name][self.t_sim, :] = self.vars_now[var_name]
291 self.t_sim += 1
293 return self.history
296class AgentTypeMonteCarloSimulator(Simulator):
297 """
298 A Monte Carlo simulation engine based on the HARK.core.AgentType framework.
300 Unlike HARK.core.AgentType, this class does not do any model solving,
301 and depends on dynamic equations, shocks, and decision rules passed into it.
303 The purpose of this class is to provide a way to simulate models without
304 relying on inheritance from the AgentType class.
306 This simulator makes assumptions about population birth and mortality which
307 are not generic. All agents are replaced with newborns when they expire.
309 Parameters
310 ------------
312 calibration: Mapping[str, Any]
314 block : DBlock
315 Has shocks, dynamics, and rewards
317 dr: Mapping[str, Callable]
319 initial: dict
321 seed : int
322 A seed for this instance's random number generator.
324 Attributes
325 ----------
326 agent_count : int
327 The number of agents of this type to use in simulation.
329 T_sim : int
330 The number of periods to simulate.
331 """
333 state_vars = []
335 def __init__(
336 self, calibration, block: DBlock, dr, initial, seed=0, agent_count=1, T_sim=10
337 ):
338 super().__init__()
339 self.read_shocks = False # NOQA -- must precede _setup if used by helpers
340 self._setup_block_state(
341 calibration, block, dr, initial, seed, agent_count, T_sim
342 )
344 def initialize_sim(self):
345 """
346 Prepares for a new simulation. Resets the internal random number generator,
347 makes initial states for all agents (using sim_birth), clears histories of tracked variables.
348 """
349 all_agents = self._start_sim()
350 self.t_age = np.zeros(self.agent_count, dtype=int)
352 # Get recorded newborn conditions or initialize blank history.
353 if self.read_shocks and bool(self.newborn_init_history):
354 for init_var_name in self.initial:
355 self.vars_now[init_var_name] = self.newborn_init_history[init_var_name][
356 self.t_sim, :
357 ]
358 else:
359 self._init_newborn_init_history()
361 self._finish_sim_init(all_agents)
362 return None
364 def sim_one_period(self):
365 """
366 Simulates one period for this type. Calls the methods get_mortality(), get_shocks() or
367 read_shocks, get_states(), get_controls(), and get_poststates(). These should be defined for
368 AgentType subclasses, except get_mortality (define its components sim_death and sim_birth
369 instead) and read_shocks.
370 """
371 # Mortality adjusts the agent population
372 self.get_mortality() # Replace some agents with "newborns"
374 self._rotate_state_buffers()
376 shocks_now = {}
378 if self.read_shocks: # If shock histories have been pre-specified, use those
379 for var_name in self.shocks:
380 shocks_now[var_name] = self.shock_history[var_name][self.t_sim, :]
381 else:
382 shocks_now = draw_shocks(self.shocks, self.t_age)
384 pre = calibration_by_age(self.t_age, self.calibration)
386 pre.update(self.vars_prev)
387 pre.update(shocks_now)
388 # Won't work for 3.8: self.parameters | self.vars_prev | shocks_now
390 # Age-varying decision rules captured here
391 dr = calibration_by_age(self.t_age, self.dr)
393 post = simulate_dynamics(self.dynamics, pre, dr)
395 self.vars_now = post
397 # Advance time for all agents
398 self.t_age = self.t_age + 1 # Age all consumers by one period
400 # What will we do with cycles?
401 # self.t_cycle = self.t_cycle + 1 # Age all consumers within their cycle
402 # self.t_cycle[
403 # self.t_cycle == self.T_cycle
404 # ] = 0 # Resetting to zero for those who have reached the end
406 def make_shock_history(self):
407 """
408 Makes a pre-specified history of shocks for the simulation. Shock variables should be named
409 in self.shock, a mapping from shock names to distributions. This method runs a subset
410 of the standard simulation loop by simulating only mortality and shocks; each variable named
411 in shocks is stored in a T_sim x agent_count array in history dictionary self.history[X].
412 Automatically sets self.read_shocks to True so that these pre-specified shocks are used for
413 all subsequent calls to simulate().
415 Returns
416 -------
417 shock_history: dict
418 The subset of simulation history that are the shocks for each agent and time.
419 """
420 # Re-initialize the simulation
421 self.initialize_sim()
422 self.simulate()
424 for shock_name in self.shocks:
425 self.shock_history[shock_name] = self.history[shock_name]
427 # Flag that shocks can be read rather than simulated
428 self.read_shocks = True
429 self.clear_history()
431 return self.shock_history
433 def get_mortality(self):
434 """
435 Simulates mortality or agent turnover.
436 Agents die when their states `live` is less than or equal to zero.
437 """
438 who_dies = self.vars_now["live"] <= 0
440 self.sim_birth(who_dies)
442 self.who_dies = who_dies
443 return None
445 def sim_birth(self, which_agents):
446 """
447 Makes new agents for the simulation. Takes a boolean array as an input, indicating which
448 agent indices are to be "born". Does nothing by default, must be overwritten by a subclass.
450 Parameters
451 ----------
452 which_agents : np.array(Bool)
453 Boolean array of size self.agent_count indicating which agents should be "born".
455 Returns
456 -------
457 None
458 """
459 if self.read_shocks:
460 t = self.t_sim
461 initial_vals = {
462 init_var: self.newborn_init_history[init_var][t, which_agents]
463 for init_var in self.initial
464 }
466 else:
467 initial_vals = draw_shocks(self.initial, np.zeros(which_agents.sum()))
469 self._assign_initial_vals(which_agents, initial_vals)
471 self.t_age[which_agents] = 0
472 self.t_cycle[which_agents] = 0
475class MonteCarloSimulator(Simulator):
476 """
477 A Monte Carlo simulation engine based.
479 Unlike the AgentTypeMonteCarloSimulator HARK.core.AgentType,
480 this class does make any assumptions about aging or mortality.
481 It operates only on model information passed in as blocks.
483 It also does not have read_shocks functionality;
484 it is a strict subset of the AgentTypeMonteCarloSimulator functionality.
486 Parameters
487 ------------
489 calibration: Mapping[str, Any]
491 block : DBlock
492 Has shocks, dynamics, and rewards
494 dr: Mapping[str, Callable]
496 initial: dict
498 seed : int
499 A seed for this instance's random number generator.
501 Attributes
502 ----------
503 agent_count : int
504 The number of agents of this type to use in simulation.
506 T_sim : int
507 The number of periods to simulate.
508 """
510 state_vars = []
512 def __init__(
513 self, calibration, block: DBlock, dr, initial, seed=0, agent_count=1, T_sim=10
514 ):
515 super().__init__()
516 # TODO: pass agent_count in at block level
517 self._setup_block_state(
518 calibration, block, dr, initial, seed, agent_count, T_sim
519 )
521 def initialize_sim(self):
522 """
523 Prepares for a new simulation. Resets the internal random number generator,
524 makes initial states for all agents (using sim_birth), clears histories of tracked variables.
525 """
526 all_agents = self._start_sim()
527 self._init_newborn_init_history()
528 self._finish_sim_init(all_agents)
529 return None
531 def sim_one_period(self):
532 """
533 Simulates one period for this type. Calls the methods get_mortality(), get_shocks() or
534 read_shocks, get_states(), get_controls(), and get_poststates(). These should be defined for
535 AgentType subclasses, except get_mortality (define its components sim_death and sim_birth
536 instead) and read_shocks.
537 """
538 self._rotate_state_buffers()
540 shocks_now = draw_shocks(
541 self.shocks,
542 np.zeros(self.agent_count), # TODO: stupid hack to remove age calculations.
543 # this needs a little more thought
544 )
546 pre = self.calibration.copy() # for AgentTypeMC, this is conditional on age
547 # TODO: generalize indexing into calibration.
549 pre.update(self.vars_prev)
550 pre.update(shocks_now)
552 # Won't work for 3.8: self.parameters | self.vars_prev | shocks_now
554 dr = self.dr # AgentTypeMC chooses rule by age;
555 # that generalizes to age as a DR argument?
557 post = simulate_dynamics(self.dynamics, pre, dr)
559 for r in self.block.reward:
560 post[r] = apply_fun_to_vals(self.block.reward[r], post)
562 self.vars_now = post
564 def sim_birth(self, which_agents):
565 """
566 Makes new agents for the simulation. Takes a boolean array as an input, indicating which
567 agent indices are to be "born". Does nothing by default, must be overwritten by a subclass.
569 Parameters
570 ----------
571 which_agents : np.array(Bool)
572 Boolean array of size self.agent_count indicating which agents should be "born".
574 Returns
575 -------
576 None
577 """
579 initial_vals = draw_shocks(self.initial, np.zeros(which_agents.sum()))
581 self._assign_initial_vals(which_agents, initial_vals)