Source code for abm_project.utils

"""Utility functions for agent-based models."""

import itertools
from collections.abc import Callable

import networkx as nx
import numpy as np
import numpy.typing as npt
import scipy

type EnvUpdateFn = Callable[
    [npt.NDArray[np.float64], npt.NDArray[np.int64]], npt.NDArray[np.float64]
]


[docs] def sigmoid_update( n: npt.NDArray[np.float64], a: npt.NDArray[np.int64] ) -> npt.NDArray[np.float64]: """Update environment according to sigmoid rule. Args: n: Each agents' current environment, shape: (agents,) a: Each agents' current action, shape: (agents,) Returns: Numpy array of new environment values for each agent, with same shape as `n`. """ sensitivity = 1 / (1 + np.exp(6 * (n - 0.5))) delta = sensitivity * a * 0.05 return n + delta
[docs] def sigmoid(x, a, b): """Sigmoid function for logistic regression. This function defines a sigmoid curve for logistic regression fitting, which maps any real-valued number into the range [0, 1]. Args: x (float or np.ndarray): Input value(s) to the sigmoid function. a (float): Slope of the sigmoid curve. b (float): Offset of the sigmoid curve. Returns: float or np.ndarray: Sigmoid-transformed value(s). """ return 1 / (1 + np.exp(-(a * x + b)))
[docs] def exponential_update(rate: float): r"""Construct exponential environment update function. .. math:: n(t+1) = n(t) + a\cdot r \cdot \exp(-n(t)) Args: rate: Multiplicative coefficient for exponential function. Returns: Exponential update function. """ def inner( n: npt.NDArray[np.float64], a: npt.NDArray[np.int64] ) -> npt.NDArray[np.float64]: """Update environment according to exponential rule. Args: n: Each agents' current environment, shape: (agents,) a: Each agents' current action, shape: (agents,) Returns: Numpy array of new environment values for each agent, with same shape as `n`. """ delta = a * rate * np.exp(-n) return n + delta return inner
[docs] def linear_update(rate: float): r"""Construct linear environment update function. .. math:: n(t+1) = n(t) + a\cdot r The returned value is clipped to the interval [0,1]. Args: rate: Linear step size. Returns: Linear update function. """ def inner( n: npt.NDArray[np.float64], a: npt.NDArray[np.int64] ) -> npt.NDArray[np.float64]: """Update environment according to linear rule. Args: n: Each agents' current environment, shape: (agents,) a: Each agents' current action, shape: (agents,) Returns: Numpy array of new environment values for each agent, with same shape as `n`. """ delta = a * rate return np.clip(n + delta, a_min=0.0, a_max=1.0) return inner
[docs] def piecewise_exponential_update(recovery: float, pollution: float, gamma: float): r"""Construct piecewise exponential environment update function. Args: recovery: Rate of improvement due to good actions pollution: Rate of degradation due to bad actions gamma: Step size for environmental change Returns: Piecewise exponential update function. """ def inner( n: npt.NDArray[np.float64], a: npt.NDArray[np.int64] ) -> npt.NDArray[np.float64]: """Update environment according to piecewise exponential rule. Args: n: Each agents' current environment, shape: (agents,) a: Each agents' current action, shape: (agents,) Returns: Numpy array of new environment values for each agent, with same shape as `n`. """ pc = (a + 1) / 2 improvement = recovery * (1 - n) * pc degradation = pollution * n * (1 - pc) dn_dt = gamma * (improvement - degradation) return n + dn_dt return inner
[docs] def lattice2d(width: int, height: int, periodic: bool = True, diagonals: bool = False): """Construct normalised adjacency matrix for a 2D lattice. Uses networkx to create a 2D lattice with optional periodic boundaries and Moore neighborhoods (diagonals). Converts this to a sparse adjacency matrix with normalised rows, to simplify computing averages over neighborhoods. Args: width: Number of nodes along the horizontal span of the lattice. height: Number of nodes along the vertical span of the lattice. periodic: Connect nodes at the edges of the lattice with periodic boundary conditions. diagonals: Connect nodes to their diagonal neighbors, also known as the Moore neighborhood. Default is the von Neumann neighborhood (cartesian neighbors). Returns: Sparse CSR adjacency matrix with shape (width x height, width x height), normalised per row. """ network = nx.grid_2d_graph(width, height, periodic=periodic) if diagonals: for y, x in itertools.product(range(height), range(width)): for dy, dx in [(-1, -1), (-1, 1), (1, -1), (1, 1)]: ty, tx = y + dy, x + dx if periodic: ty = ty % height tx = tx % width if 0 <= ty < height and 0 <= tx < width: network.add_edge((y, x), (ty, tx)) # Extract normalised adjacency matrix adj = nx.adjacency_matrix(network) adj = adj / adj.sum(axis=1)[:, None] return scipy.sparse.csr_array(adj)