Source code for abm_project.vectorised_model

"""Vectorised implementation of the OOP model."""

from collections.abc import Callable

import numpy as np
import numpy.typing as npt
import scipy

from .oop_model import BaseModel
from .utils import lattice2d, piecewise_exponential_update

type EnvUpdateFn = Callable[
    [npt.NDArray[np.float64], npt.NDArray[np.int64]], npt.NDArray[np.float64]
]


[docs] class VectorisedModel: """Vectorised base model for agent-based simulations. This model comprises a 2D lattice of agents who repeatedly choose between cooperation (pro-environmental behaviour) and defection, based on the state of their local environment and the social norms imposed by their direct neighbors. Agents have heterogeneous attributes which weight the respective contributions of environmental concern and social norms in the decision-making process. Attributes: action (npt.NDArray[np.int64]): 2D array of agents' actions with shape (time, agent). environment (npt.NDArray[np.int64]): 2D array of agents' environments with shape (time, agent). s (npt.NDArray[np.float64]): 2D array of agents' support for cooperation with shape (time, agent). b (npt.NDArray[np.float64]): 2D array of agents' decision-making weights, shape (attributes, agent). rationality: float Homogeneous rationality coefficient for all agents. adj (npt.NDArray[np.float64]): Normalised adjacency matrix with shape (agent, agent). time (int): Current time step in the simulation. num_agents (int): Total number of agents in the grid. width (int): Width of the grid. height (int): Height of the grid. simmer_time (int): Number of agent adaptation steps between environment updates. rng (np.random.Generator): Random number generator for stochastic processes. """ DEFAULT_NUM_AGENTS = BaseModel.DEFAULT_NUM_AGENTS DEFAULT_WIDTH = BaseModel.DEFAULT_WIDTH DEFAULT_HEIGHT = BaseModel.DEFAULT_HEIGHT DEFAULT_MEMORY_COUNT = BaseModel.DEFAULT_MEMORY_COUNT DEFAULT_MAX_STORAGE = 1000 DEFAULT_SIMMER_TIME = 1 DEFAULT_NEIGHB_PREDICTION_OPTION = "linear" # "logistic", None DEFAULT_SEVERITY_BENEFIT_OPTION = "adaptive" # None DEFAULT_RADIUS_OPTION = "single" # "all" ACTIONS = [-1, 1] N_WEIGHTS = 2 def __init__( self, num_agents: int = DEFAULT_NUM_AGENTS, width: int = DEFAULT_WIDTH, height: int = DEFAULT_HEIGHT, memory_count: int = DEFAULT_MEMORY_COUNT, env_update_fn: EnvUpdateFn | None = None, rng: np.random.Generator = None, rationality: float = 1.0, max_storage: int = DEFAULT_MAX_STORAGE, moore: bool = True, simmer_time: int = DEFAULT_SIMMER_TIME, neighb_prediction_option: str = DEFAULT_NEIGHB_PREDICTION_OPTION, severity_benefit_option: str = DEFAULT_SEVERITY_BENEFIT_OPTION, radius_option: str = DEFAULT_RADIUS_OPTION, prop_pessimistic: float = 0, pessimism_level: float = 1, randomise: bool = True, b_1: npt.NDArray[np.float64] | None = None, b_2: npt.NDArray[np.float64] | None = None, gamma_s: float = 0.01, ): """Construct new vectorised model. Args: num_agents: Number of agents in the lattice. width: Number of agents in the horizontal span of the lattice. height: Number of agents in the vertical span of the lattice. memory_count: Length of agents' memory. env_update_fn: Function which accepts the current environment and action, and returns an updated environment status. rng: Random number generator, defaults to None. rationality: Agent rationality. Lower --> more random. Higher --> more rational. max_storage: Maximum number of timesteps to record in history. Safeguards against runaway memory. moore: Include diagonal neighbors. simmer_time: Number of agent adaptation steps between environment updates. neighb_prediction_option: Method for predicting neighbors' actions. severity_benefit_option: Method for calculating the benefit of cooperating in a healthy environment. radius_option: Method for determining the radius of neighbors to consider when calculating neighbors' actions. Options are "single" (default) for prop_pessimistic: Proportion of agents to set as pessimistic. pessimism_level: How much pessimistic agents overestimate environmental degradation. Higher is more pessimistic. The default (1) is no pessimism. randomise: Randomise the initial environment and actions. Default (False) initialises with a healthy environment, and global inaction. b_1: Initial weight for the first attribute (e.g., environmental concern). b_2: Initial weight for the second attribute (e.g., social norms). gamma_s: Rate at which agents change their action preferences. """ self.time = 0 self.num_agents = num_agents self.width = width self.height = height self.memory_count = memory_count self.env_update_fn = env_update_fn or piecewise_exponential_update( 1, 1, 0.01 ) # linear_update(0.05) self.rng = rng or np.random.default_rng() self.max_storage = max_storage + 1 self.simmer_time = simmer_time self.neighb_prediction_option = neighb_prediction_option self.severity_benefit_option = severity_benefit_option self.radius_option = radius_option # Set up agents' connections and attributes self.adj = lattice2d(width, height, periodic=True, diagonals=moore) self.rationality = rationality self.b = np.zeros( (self.N_WEIGHTS, self.num_agents), dtype=np.float64, ) self.b[0] = b_1 self.b[1] = b_2 if b_1 is None: self.b[0] = self.rng.random(self.num_agents) if b_2 is None: self.b[1] = self.rng.random(self.num_agents) self.b = self.b / self.b.sum(axis=0, keepdims=True) self.gamma_s = gamma_s # Set strategy change params # alpha: rate of increasing support when support is low # beta: rate of decreasing support when support is high self.alpha = 1 self.beta = 1 pessimistic = self.rng.random(self.num_agents) < prop_pessimistic self.pessimism = np.ones(self.num_agents) self.pessimism[pessimistic] = pessimism_level # Initialise agents and environment self.initialise(zero=not randomise) self.initial_action = self.action[: self.memory_count].copy() self.initial_environment = self.environment[: self.memory_count].copy()
[docs] def initialise(self, zero: bool = False): """Initialise agents' and environment state. Optionally sets agents' initial actions to zero (defection), and environments to one (healthy). After initialising the environment, runs agent adaptation for 100 steps to reach stable support levels (for cooperation) given agent heterogeneity. Args: zero: Initialise environment as healthy and agents as cooperating. """ self.action = -np.ones( (self.max_storage, self.num_agents), dtype=np.int64, ) self.environment = np.ones( (self.max_storage, self.num_agents), dtype=np.float64, ) self.s = np.zeros( (self.max_storage, self.num_agents), dtype=np.float64, ) self.curr_s = np.zeros(self.num_agents, dtype=np.float64) + 0.5 if not zero: self.action[: self.memory_count] = self.rng.choice( self.ACTIONS, size=(self.memory_count, self.num_agents), ) self.environment[: self.memory_count] = self.rng.random( size=(self.memory_count, self.num_agents), dtype=np.float64 ) for _ in range(100): self.adapt(self.environment[0])
[docs] def reset(self): """Re-initialise agents to original actions and environment states.""" self.initialise(zero=True) self.action[: self.memory_count] = self.initial_action self.environment[: self.memory_count] = self.initial_environment
[docs] def run(self, steps: int = 20): """Run simulation for specified number of steps. Args: steps: Number of steps to iterate. """ if steps > self.environment.shape[0]: raise RuntimeError("Maximum steps exceeded. Raise `max_storage`.") self.r = self.rng.random(size=(steps * self.simmer_time, self.num_agents)) for _ in range(steps): self.step()
[docs] def step(self): """Execute a single simulation step. A step comprises the following processes: 1. Increment the simulation time. 2. Update the environment based on agents' actions in the previous timestep. 3. Run a fixed number of agent decision-making and adaptation steps. """ self.time += 1 self.update_env() self.simmer() self.adapt(self.environment[self.time - 1]) self.s[self.time] = self.curr_s.copy()
[docs] def update_env(self): """Update each agents' environment based on their last action.""" self.environment[self.time] = self.env_update_fn( self.environment[self.time - 1], self.action[self.time - 1], )
[docs] def simmer(self): """Simulate a number of agent decision-making and adaptation steps. A step comprises the following processes: 1. Agents choose an action based on their current support for cooperation, and the social norms imposed by their neighbors. 2. Agents adapt their support for cooperation based on the current state of the environment. Note that the environment is fixed during this process. As such, a longer simmer time reflects a faster rate of behavioural change relative to the rate of environmental change. """ for i in range(self.simmer_time): self.decide(i)
[docs] def decide(self, i: int): """Select a new action for each agent. The probability of selecting each action is set by an agents' logit model, based on their current environment and the social norms imposed by their neighbors. To select a new action, we sample a random number in [0,1] for each agent. If it does not exceed the probability of cooperation, the agent cooperates, and defects otherwise. Args: i: Simmer step idx """ pa = self.action_probabilities() r = self.r[((self.time - 1) * self.simmer_time) + i] self.action[self.time] = np.where(r < pa[1], 1, -1)
[docs] def adapt(self, n: npt.NDArray[np.float64]): r"""Update agents' support for cooperation. Agents' support for cooperation changes as a function of the current environment. It decreases when the environment is either particularly healthy (no reason to act) or particularly unhealthy (no point in acting). It increases when the environment is not at either of these extremes. We write the change in support as a derivative: .. math:: \frac{ds_i}{dt} = \alpha_i \sigma(n_i) (1 - s_i(t)) \ - \beta_i (1 - \sigma(n_i)) s_i(t) where :math:`\sigma(n_i) = 4n_i (1 - n_i)`. Args: n: Current state of the environment, with shape (agent,) """ logistic = 4 * n * (1 - n) # Scale derivative so it is zero at the boundaries ds_dt = ( self.alpha * logistic * (4 - self.curr_s) - self.beta * (1 - logistic) * self.curr_s ) self.curr_s += self.gamma_s * ds_dt
[docs] def pred_neighb_action(self) -> npt.NDArray[np.float64]: """Predict the average action of peers based on their recent actions. Args: memory (int): Number of previous steps to use for prediction. method (str): Prediction method, "linear" or "logistic". Returns: npt.NDArray[np.float64]: Predicted average action of neighbors for each agent. """ # Get the recent actions for each agent (shape: memory, num_agents) start = max(0, self.time - self.memory_count) stop = self.time actions = self.action[start:stop] # shape: (memory, num_agents) if actions.shape[0] < 2: if self.neighb_prediction_option == "true_pref": return self.adj @ self.s[self.time] return self.mean_local_action(memory=self.memory_count) if self.neighb_prediction_option == "linear": time_steps = np.arange(actions.shape[0]) coeffs = np.polyfit(time_steps, actions, 1) predicted = np.polyval(coeffs, actions.shape[0]) neighb_pred = self.adj @ predicted return np.where(neighb_pred >= 0, 1, -1) elif self.neighb_prediction_option == "logistic": # For logistic regression, map actions from -1/1 to 0/1 act_bin = (actions + 1) // 2 time_steps = np.arange(actions.shape[0]) log_time = np.log(time_steps + 1) predicted_probs = np.zeros(self.num_agents) for i in range(self.num_agents): try: popt, _ = scipy.optimize.curve_fit( lambda t, a, b, c: a / (1 + np.exp(-b * (t - c))), log_time, act_bin[:, i], maxfev=10000, bounds=([0, -np.inf, -np.inf], [1, np.inf, np.inf]), ) pred_prob = popt[0] / ( 1 + np.exp(-popt[1] * (np.log(actions.shape[0] + 1) - popt[2])) ) except Exception: pred_prob = np.mean(act_bin[:, i]) predicted_probs[i] = pred_prob neighb_pred = self.adj @ predicted_probs return np.where(neighb_pred >= 0.5, 1, -1) else: return self.mean_local_action(memory=self.memory_count)
[docs] def action_probabilities(self) -> npt.NDArray[np.float64]: r"""Calculate the probability of each possible action. The probabilities are calculated using a logit softmax function over the utilities of each action. The formula is: .. math:: P(a_i(t) = a) = \frac{\exp(\lambda \cdot V_i(a))}{\exp(\lambda \cdot \ V_i(C)) + \exp(\lambda \cdot V_i(D))} Where :math:`V_i(a)` is the representative utility for action :math:`a` for agent :math:`i`. Returns: An array of probabilities for each action, with shape (2,agent). """ if self.severity_benefit_option == "adaptive": m = self.pred_neighb_action() z = self.b[0] * (self.curr_s - 2) + 2 * self.b[1] * m pc = 1 / (1 + np.exp(-2 * self.rationality * z)) return (1 - pc, pc) utilities = self.rationality * np.array( [self.representative_utility(-1), self.representative_utility(1)] ) exp_utilities = np.exp(utilities - np.max(utilities, axis=0)) return exp_utilities / exp_utilities.sum(axis=0)
[docs] def representative_utility(self, action: int) -> float: r"""Calculate the representative utility of an action for each agent. Representative utility is a linear combination of the support for cooperation and the social norms imposed by an agents' neighbors: .. math:: V_i(a) = b_1 \cdot [a^* \cdot s_i(t) + (1 - a^*) \cdot (1 - s_i(t))] + \ b_2 (a^* - \overline{A^*}_i (t))^2 Where :math:`a^* = (a + 1)/2` is a transformation of the action to the set :math:`\{0,1\}`. """ preference = self.calculate_individual_preference(action) social_pressure = self.calculate_social_pressure(action) return self.b[0] * preference - self.b[1] * social_pressure
[docs] def calculate_individual_preference(self, action: int) -> npt.NDArray[np.float64]: """Calculate agents' individual preference for an action.""" if self.severity_benefit_option == "adaptive": a = int((action + 1) / 2) individual_preference = (a) * self.curr_s + (1 - a) * (4 - self.curr_s) else: individual_preference = -(2 * self.environment[self.time - 1] - 1) * action return individual_preference
[docs] def calculate_social_pressure(self, action: int) -> npt.NDArray[np.float64]: """Calculate agents' social pressure when taking a given action.""" return (action - self.pred_neighb_action()) ** 2
[docs] def mean_local_action(self, memory: int = 1) -> npt.NDArray[np.float64]: """Calculate the average action in each agents' local neighborhood. Args: memory: Number of previous neighbors' actions to consider. Returns: The mean local action for each agent, reflecting the perceived social norm for each agent at the current timestep. Shape is (agent,). """ if self.radius_option == "single": if memory == -1: memory = self.max_storage start = max(0, self.time - memory) stop = self.time mean_per_timestep = self.adj @ self.action[start:stop].T return mean_per_timestep.mean(axis=1) elif self.radius_option == "all": if memory == -1: memory = self.max_storage start = max(0, self.time - memory) stop = self.time mean_per_timestep = self.action[start:stop].mean(axis=1) return np.full(self.num_agents, mean_per_timestep.mean())