Coverage for HARK/simulation/monte_carlo.py: 97%
219 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 05:14 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-02 05:14 +0000
1"""
2Functions to support Monte Carlo simulation of models.
3"""
5from copy import copy
6from typing import Mapping, Sequence
8import numpy as np
10from HARK.distributions import (
11 Distribution,
12 IndexDistribution,
13)
14from HARK.model import Aggregate
15from HARK.model import DBlock
16from HARK.model import construct_shocks, simulate_dynamics
18from HARK.utilities import apply_fun_to_vals
21def draw_shocks(shocks: Mapping[str, Distribution], conditions: Sequence[int]):
22 """
23 Draw from each shock distribution values, subject to given conditions.
25 Parameters
26 ------------
27 shocks Mapping[str, Distribution]
28 A dictionary-like mapping from shock names to distributions from which to draw
30 conditions: Sequence[int]
31 An array of conditions, one for each agent.
32 Typically these will be agent ages.
33 # TODO: generalize this to wider range of inputs.
35 Parameters
36 ------------
37 draws : Mapping[str, Sequence]
38 A mapping from shock names to drawn shock values.
39 """
40 draws = {}
42 for shock_var in shocks:
43 shock = shocks[shock_var]
45 if isinstance(shock, (int, float)):
46 draws[shock_var] = np.ones(len(conditions)) * shock
47 elif isinstance(shock, Aggregate):
48 draws[shock_var] = shock.dist.draw(1)[0]
49 elif isinstance(shock, IndexDistribution):
50 draws[shock_var] = shock.draw(conditions)
51 else:
52 draws[shock_var] = shock.draw(len(conditions))
53 # this is hacky if there are no conditions.
55 return draws
58def calibration_by_age(ages, calibration):
59 """
60 Returns calibration for this model, but with vectorized
61 values which map age-varying values to agent ages.
63 Parameters
64 ----------
65 ages: np.array
66 An array of agent ages.
68 calibration: dict
69 A calibration dictionary
71 Returns
72 --------
73 aged_calibration: dict
74 A dictionary of parameter values.
75 If a parameter is age-varying, the value is a vector
76 corresponding to the values for each input age.
77 """
79 def aged_param(ages, p_value):
80 if isinstance(p_value, (float, int)) or callable(p_value):
81 return p_value
82 elif isinstance(p_value, list) and len(p_value) > 1:
83 pv_array = np.array(p_value)
85 return np.apply_along_axis(lambda a: pv_array[a], 0, ages)
86 else:
87 return np.empty(ages.size)
89 return {p: aged_param(ages, calibration[p]) for p in calibration}
92class Simulator:
93 pass
96class AgentTypeMonteCarloSimulator(Simulator):
97 """
98 A Monte Carlo simulation engine based on the HARK.core.AgentType framework.
100 Unlike HARK.core.AgentType, this class does not do any model solving,
101 and depends on dynamic equations, shocks, and decision rules paased into it.
103 The purpose of this class is to provide a way to simulate models without
104 relying on inheritance from the AgentType class.
106 This simulator makes assumptions about population birth and mortality which
107 are not generic. All agents are replaced with newborns when they expire.
109 Parameters
110 ------------
112 calibration: Mapping[str, Any]
114 block : DBlock
115 Has shocks, dynamics, and rewards
117 dr: Mapping[str, Callable]
119 initial: dict
121 seed : int
122 A seed for this instance's random number generator.
124 Attributes
125 ----------
126 agent_count : int
127 The number of agents of this type to use in simulation.
129 T_sim : int
130 The number of periods to simulate.
131 """
133 state_vars = []
135 def __init__(
136 self, calibration, block: DBlock, dr, initial, seed=0, agent_count=1, T_sim=10
137 ):
138 super().__init__()
140 self.calibration = calibration
141 self.block = block
143 # shocks are exogenous (but for age) but can depend on calibration
144 raw_shocks = block.get_shocks()
145 self.shocks = construct_shocks(raw_shocks, calibration)
147 self.dynamics = block.get_dynamics()
148 self.dr = dr
149 self.initial = initial
151 self.seed = seed # NOQA
152 self.agent_count = agent_count
153 self.T_sim = T_sim
155 # changes here from HARK.core.AgentType
156 self.vars = block.get_vars()
158 self.vars_now = {v: None for v in self.vars}
159 self.vars_prev = self.vars_now.copy()
161 self.read_shocks = False # NOQA
162 self.shock_history = {}
163 self.newborn_init_history = {}
164 self.history = {}
166 self.reset_rng() # NOQA
168 def reset_rng(self):
169 """
170 Reset the random number generator for this type.
171 """
172 self.RNG = np.random.default_rng(self.seed)
174 def initialize_sim(self):
175 """
176 Prepares for a new simulation. Resets the internal random number generator,
177 makes initial states for all agents (using sim_birth), clears histories of tracked variables.
178 """
179 if self.T_sim <= 0:
180 raise Exception(
181 "T_sim represents the largest number of observations "
182 + "that can be simulated for an agent, and must be a positive number."
183 )
185 self.reset_rng()
186 self.t_sim = 0
187 all_agents = np.ones(self.agent_count, dtype=bool)
188 blank_array = np.empty(self.agent_count)
189 blank_array[:] = np.nan
190 for var in self.vars:
191 if self.vars_now[var] is None:
192 self.vars_now[var] = copy(blank_array)
194 self.t_age = np.zeros(
195 self.agent_count, dtype=int
196 ) # Number of periods since agent entry
197 self.t_cycle = np.zeros(
198 self.agent_count, dtype=int
199 ) # Which cycle period each agent is on
201 # Get recorded newborn conditions or initialize blank history.
202 if self.read_shocks and bool(self.newborn_init_history):
203 for init_var_name in self.initial:
204 self.vars_now[init_var_name] = self.newborn_init_history[init_var_name][
205 self.t_sim, :
206 ]
207 else:
208 for var_name in self.initial:
209 self.newborn_init_history[var_name] = (
210 np.zeros((self.T_sim, self.agent_count)) + np.nan
211 )
213 self.sim_birth(all_agents)
215 self.clear_history()
216 return None
218 def sim_one_period(self):
219 """
220 Simulates one period for this type. Calls the methods get_mortality(), get_shocks() or
221 read_shocks, get_states(), get_controls(), and get_poststates(). These should be defined for
222 AgentType subclasses, except get_mortality (define its components sim_death and sim_birth
223 instead) and read_shocks.
224 """
225 # Mortality adjusts the agent population
226 self.get_mortality() # Replace some agents with "newborns"
228 # state_{t-1}
229 for var in self.vars:
230 self.vars_prev[var] = self.vars_now[var]
232 if isinstance(self.vars_now[var], np.ndarray):
233 self.vars_now[var] = np.empty(self.agent_count)
234 self.vars_now[var][:] = np.nan
235 else:
236 # Probably an aggregate variable. It may be getting set by the Market.
237 pass
239 shocks_now = {}
241 if self.read_shocks: # If shock histories have been pre-specified, use those
242 for var_name in self.shocks:
243 shocks_now[var_name] = self.shock_history[var_name][self.t_sim, :]
244 else:
245 ### BIG CHANGES HERE from HARK.core.AgentType
246 shocks_now = draw_shocks(self.shocks, self.t_age)
248 pre = calibration_by_age(self.t_age, self.calibration)
250 pre.update(self.vars_prev)
251 pre.update(shocks_now)
252 # Won't work for 3.8: self.parameters | self.vars_prev | shocks_now
254 # Age-varying decision rules captured here
255 dr = calibration_by_age(self.t_age, self.dr)
257 post = simulate_dynamics(self.dynamics, pre, dr)
259 self.vars_now = post
260 ### BIG CHANGES HERE
262 # Advance time for all agents
263 self.t_age = self.t_age + 1 # Age all consumers by one period
265 # What will we do with cycles?
266 # self.t_cycle = self.t_cycle + 1 # Age all consumers within their cycle
267 # self.t_cycle[
268 # self.t_cycle == self.T_cycle
269 # ] = 0 # Resetting to zero for those who have reached the end
271 def make_shock_history(self):
272 """
273 Makes a pre-specified history of shocks for the simulation. Shock variables should be named
274 in self.shock, a mapping from shock names to distributions. This method runs a subset
275 of the standard simulation loop by simulating only mortality and shocks; each variable named
276 in shocks is stored in a T_sim x agent_count array in history dictionary self.history[X].
277 Automatically sets self.read_shocks to True so that these pre-specified shocks are used for
278 all subsequent calls to simulate().
280 Returns
281 -------
282 shock_history: dict
283 The subset of simulation history that are the shocks for each agent and time.
284 """
285 # Re-initialize the simulation
286 self.initialize_sim()
287 self.simulate()
289 for shock_name in self.shocks:
290 self.shock_history[shock_name] = self.history[shock_name]
292 # Flag that shocks can be read rather than simulated
293 self.read_shocks = True
294 self.clear_history()
296 return self.shock_history
298 def get_mortality(self):
299 """
300 Simulates mortality or agent turnover.
301 Agents die when their states `live` is less than or equal to zero.
302 """
303 who_dies = self.vars_now["live"] <= 0
305 self.sim_birth(who_dies)
307 self.who_dies = who_dies
308 return None
310 def sim_birth(self, which_agents):
311 """
312 Makes new agents for the simulation. Takes a boolean array as an input, indicating which
313 agent indices are to be "born". Does nothing by default, must be overwritten by a subclass.
315 Parameters
316 ----------
317 which_agents : np.array(Bool)
318 Boolean array of size self.agent_count indicating which agents should be "born".
320 Returns
321 -------
322 None
323 """
324 if self.read_shocks:
325 t = self.t_sim
326 initial_vals = {
327 init_var: self.newborn_init_history[init_var][t, which_agents]
328 for init_var in self.initial
329 }
331 else:
332 initial_vals = draw_shocks(self.initial, np.zeros(which_agents.sum()))
334 if np.sum(which_agents) > 0:
335 for varn in initial_vals:
336 self.vars_now[varn][which_agents] = initial_vals[varn]
337 self.newborn_init_history[varn][self.t_sim, which_agents] = (
338 initial_vals[varn]
339 )
341 self.t_age[which_agents] = 0
342 self.t_cycle[which_agents] = 0
344 def simulate(self, sim_periods=None):
345 """
346 Simulates this agent type for a given number of periods. Defaults to
347 self.T_sim if no input.
348 Records histories of attributes named in self.track_vars in
349 self.history[varname].
351 Parameters
352 ----------
353 None
355 Returns
356 -------
357 history : dict
358 The history tracked during the simulation.
359 """
360 if not hasattr(self, "t_sim"):
361 raise Exception(
362 "It seems that the simulation variables were not initialize before calling "
363 + "simulate(). Call initialize_sim() to initialize the variables before calling simulate() again."
364 )
365 if sim_periods is not None and self.T_sim < sim_periods:
366 raise Exception(
367 "To simulate, sim_periods has to be larger than the maximum data set size "
368 + "T_sim. Either increase the attribute T_sim of this agent type instance "
369 + "and call the initialize_sim() method again, or set sim_periods <= T_sim."
370 )
372 # Ignore floating point "errors". Numpy calls it "errors", but really it's excep-
373 # tions with well-defined answers such as 1.0/0.0 that is np.inf, -1.0/0.0 that is
374 # -np.inf, np.inf/np.inf is np.nan and so on.
375 with np.errstate(
376 divide="ignore", over="ignore", under="ignore", invalid="ignore"
377 ):
378 if sim_periods is None:
379 sim_periods = self.T_sim
381 for t in range(sim_periods):
382 self.sim_one_period()
384 # track all the vars -- shocks and dynamics
385 for var_name in self.vars:
386 self.history[var_name][self.t_sim, :] = self.vars_now[var_name]
388 self.t_sim += 1
390 return self.history
392 def clear_history(self):
393 """
394 Clears the histories.
395 """
396 for var_name in self.vars:
397 self.history[var_name] = np.empty((self.T_sim, self.agent_count))
398 self.history[var_name].fill(np.nan)
401class MonteCarloSimulator(Simulator):
402 """
403 A Monte Carlo simulation engine based.
405 Unlike the AgentTypeMonteCarloSimulator HARK.core.AgentType,
406 this class does make any assumptions about aging or mortality.
407 It operates only on model information passed in as blocks.
409 It also does not have read_shocks functionality;
410 it is a strict subset of the AgentTypeMonteCarloSimulator functionality.
412 Parameters
413 ------------
415 calibration: Mapping[str, Any]
417 block : DBlock
418 Has shocks, dynamics, and rewards
420 dr: Mapping[str, Callable]
422 initial: dict
424 seed : int
425 A seed for this instance's random number generator.
427 Attributes
428 ----------
429 agent_count : int
430 The number of agents of this type to use in simulation.
432 T_sim : int
433 The number of periods to simulate.
434 """
436 state_vars = []
438 def __init__(
439 self, calibration, block: DBlock, dr, initial, seed=0, agent_count=1, T_sim=10
440 ):
441 super().__init__()
443 self.calibration = calibration
444 self.block = block
446 # shocks are exogenous (but for age) but can depend on calibration
447 raw_shocks = block.get_shocks()
448 self.shocks = construct_shocks(raw_shocks, calibration)
450 self.dynamics = block.get_dynamics()
451 self.dr = dr
452 self.initial = initial
454 self.seed = seed # NOQA
455 self.agent_count = agent_count # TODO: pass this in at block level
456 self.T_sim = T_sim
458 # changes here from HARK.core.AgentType
459 self.vars = block.get_vars()
461 self.vars_now = {v: None for v in self.vars}
462 self.vars_prev = self.vars_now.copy()
464 self.shock_history = {}
465 self.newborn_init_history = {}
466 self.history = {}
468 self.reset_rng() # NOQA
470 def reset_rng(self):
471 """
472 Reset the random number generator for this type.
473 """
474 self.RNG = np.random.default_rng(self.seed)
476 def initialize_sim(self):
477 """
478 Prepares for a new simulation. Resets the internal random number generator,
479 makes initial states for all agents (using sim_birth), clears histories of tracked variables.
480 """
481 if self.T_sim <= 0:
482 raise Exception(
483 "T_sim represents the largest number of observations "
484 + "that can be simulated for an agent, and must be a positive number."
485 )
487 self.reset_rng()
488 self.t_sim = 0
489 all_agents = np.ones(self.agent_count, dtype=bool)
490 blank_array = np.empty(self.agent_count)
491 blank_array[:] = np.nan
492 for var in self.vars:
493 if self.vars_now[var] is None:
494 self.vars_now[var] = copy(blank_array)
496 self.t_cycle = np.zeros(
497 self.agent_count, dtype=int
498 ) # Which cycle period each agent is on
500 for var_name in self.initial:
501 self.newborn_init_history[var_name] = (
502 np.zeros((self.T_sim, self.agent_count)) + np.nan
503 )
505 self.sim_birth(all_agents)
507 self.clear_history()
508 return None
510 def sim_one_period(self):
511 """
512 Simulates one period for this type. Calls the methods get_mortality(), get_shocks() or
513 read_shocks, get_states(), get_controls(), and get_poststates(). These should be defined for
514 AgentType subclasses, except get_mortality (define its components sim_death and sim_birth
515 instead) and read_shocks.
516 """
518 # state_{t-1}
519 for var in self.vars:
520 self.vars_prev[var] = self.vars_now[var]
522 if isinstance(self.vars_now[var], np.ndarray):
523 self.vars_now[var] = np.empty(self.agent_count)
524 self.vars_now[var][:] = np.nan
525 else:
526 # Probably an aggregate variable. It may be getting set by the Market.
527 pass
529 shocks_now = {}
531 shocks_now = draw_shocks(
532 self.shocks,
533 np.zeros(self.agent_count), # TODO: stupid hack to remove age calculations.
534 # this needs a little more thought
535 )
537 pre = self.calibration.copy() # for AgentTypeMC, this is conditional on age
538 # TODO: generalize indexing into calibration.
540 pre.update(self.vars_prev)
541 pre.update(shocks_now)
543 # Won't work for 3.8: self.parameters | self.vars_prev | shocks_now
545 dr = self.dr # AgentTypeMC chooses rule by age;
546 # that generalizes to age as a DR argument?
548 post = simulate_dynamics(self.dynamics, pre, dr)
550 for r in self.block.reward:
551 post[r] = apply_fun_to_vals(self.block.reward[r], post)
553 self.vars_now = post
555 def sim_birth(self, which_agents):
556 """
557 Makes new agents for the simulation. Takes a boolean array as an input, indicating which
558 agent indices are to be "born". Does nothing by default, must be overwritten by a subclass.
560 Parameters
561 ----------
562 which_agents : np.array(Bool)
563 Boolean array of size self.agent_count indicating which agents should be "born".
565 Returns
566 -------
567 None
568 """
570 initial_vals = draw_shocks(self.initial, np.zeros(which_agents.sum()))
572 if np.sum(which_agents) > 0:
573 for varn in initial_vals:
574 self.vars_now[varn][which_agents] = initial_vals[varn]
575 self.newborn_init_history[varn][self.t_sim, which_agents] = (
576 initial_vals[varn]
577 )
579 def simulate(self, sim_periods=None):
580 """
581 Simulates this agent type for a given number of periods. Defaults to
582 self.T_sim if no input.
584 Records histories of attributes named in self.track_vars in
585 self.history[varname].
587 Parameters
588 ----------
589 sim_periods : int
590 Number of periods to simulate.
592 Returns
593 -------
594 history : dict
595 The history tracked during the simulation.
596 """
597 if not hasattr(self, "t_sim"):
598 raise Exception(
599 "It seems that the simulation variables were not initialize before calling "
600 + "simulate(). Call initialize_sim() to initialize the variables before calling simulate() again."
601 )
602 if sim_periods is not None and self.T_sim < sim_periods:
603 raise Exception(
604 "To simulate, sim_periods has to be larger than the maximum data set size "
605 + "T_sim. Either increase the attribute T_sim of this agent type instance "
606 + "and call the initialize_sim() method again, or set sim_periods <= T_sim."
607 )
609 # Ignore floating point "errors". Numpy calls it "errors", but really it's excep-
610 # tions with well-defined answers such as 1.0/0.0 that is np.inf, -1.0/0.0 that is
611 # -np.inf, np.inf/np.inf is np.nan and so on.
612 with np.errstate(
613 divide="ignore", over="ignore", under="ignore", invalid="ignore"
614 ):
615 if sim_periods is None:
616 sim_periods = self.T_sim
618 for t in range(sim_periods):
619 self.sim_one_period()
621 # track all the vars -- shocks and dynamics
622 for var_name in self.vars:
623 self.history[var_name][self.t_sim, :] = self.vars_now[var_name]
625 self.t_sim += 1
627 return self.history
629 def clear_history(self):
630 """
631 Clears the histories.
632 """
633 for var_name in self.vars:
634 self.history[var_name] = np.empty((self.T_sim, self.agent_count))
635 self.history[var_name].fill(np.nan)