Source code for abm_project.batch_run_tools

"""Batch run tools for ABM project."""

import numpy as np
from scipy.fft import rfft, rfftfreq
from scipy.ndimage import convolve

from abm_project.oop_model import BaseModel


[docs] def run_parameter_batch( num_runs: int, model_class, steps: int = 100, **kwargs ) -> list[BaseModel]: """Run a batch of agent-based model simulations. Args: num_runs (int): Number of model runs to execute. model_class (type): The class of the model to instantiate. steps (int): Number of simulation steps to run for each model. **kwargs: Additional keyword arguments to pass to the model constructor. Returns: list[BaseModel]: List of model instances after running the simulation. """ models = [] for _ in range(num_runs): model = model_class(**kwargs) model.run(steps) models.append(model) return models
[docs] def extract_mean_and_variance(models, attribute: str): """Extract the mean and variance of a specified attribute. Args: models (list): List of BaseModel instances. attribute (str): The name of the attribute history to extract (e.g., "agent_action_history"). Returns: tuple: (means, variances), where each is a list of values per time step. """ # Determine the number of time steps in the simulation num_steps = len(models[0].__getattribute__(attribute)) means = [] variances = [] # Iterate through each time step for t in range(num_steps): # Get the attribute grid at time t from each model grids = np.array([model.__getattribute__(attribute)[t] for model in models]) # Compute the mean and variance of values across the grids step_means = np.array([np.mean(grid) for grid in grids]) step_vars = np.array([np.var(grid) for grid in grids]) # Record the average mean and variance across all model runs means.append(np.mean(step_means)) variances.append(np.mean(step_vars)) return means, variances
[docs] def average_metric_over_time(models, attr, inner_mean=False): """Calculate the average of a given attribute over time across all models. Args: models (list[BaseModel]): List of BaseModel instances. attr (str): Attribute name to average (e.g., 'agent_action_history'). inner_mean (bool): If True, take mean over inner array before averaging across models. Returns: list: Average value at each time step. """ history_length = len(getattr(models[0], attr)) if inner_mean: # For each time step, compute the mean of the inner array for each model, # then average these means across all models. result = [] for t in range(history_length): inner_means = [] for m in models: # Get the attribute at time t for model m (should be an array-like) values = getattr(m, attr)[t] mean_value = np.mean(values) inner_means.append(mean_value) # Average the means across all models for this time step avg_mean = np.mean(inner_means) result.append(avg_mean) return result else: return [ np.mean([getattr(m, attr)[t] for m in models]) for t in range(history_length) ]
[docs] def attribute_variance_over_time(models, attr): """Calculate the mean variance of a given attribute over time across all models. Args: models (list[BaseModel]): List of BaseModel instances. attr (str): Attribute name to compute variance over (e.g., 'agent_peer_pressure_coeff_history'). Returns: list: Mean variance at each time step. """ history_length = len(getattr(models[0], attr)) return [ np.mean([np.var(getattr(m, attr)[t]) for m in models]) for t in range(history_length) ]
[docs] def local_action_agreement_score( flat_grid: np.ndarray, width: int, height: int, radius: int = 1 ) -> float: """Compute a spatial clustering score based on local action agreement. Args: flat_grid (np.ndarray): 1D array of agent actions. width (int): Width of the agent grid. height (int): Height of the agent grid. radius (int): Neighborhood radius. Returns: float: Clustering score (0 to 1). """ if flat_grid.size != width * height: raise ValueError("flat_grid size does not match width x height") grid = flat_grid.reshape((height, width)) kernel = np.ones((2 * radius + 1, 2 * radius + 1), dtype=int) kernel[radius, radius] = 0 # Exclude center score = 0.0 for val in np.unique(grid): match = (grid == val).astype(int) same_neighbors = convolve(match, kernel, mode="constant", cval=0) total_neighbors = convolve(np.ones_like(grid), kernel, mode="constant", cval=0) local_agreement = same_neighbors / total_neighbors score += np.sum(local_agreement * match) return score / grid.size
[docs] def clustering_score_over_time( model, attribute: str = "action", width: int = 50, height: int = 50, radius: int = 1 ) -> list[float]: """Compute a clustering score at each time step for a 1D action vector per step. Args: model: The model instance. attribute (str): Attribute name for the per-timestep 1D agent data. width (int): Grid width. height (int): Grid height. radius (int): Neighborhood radius. Returns: list[float]: Clustering scores per time step. """ time_series = getattr(model, attribute) return [ local_action_agreement_score( flat_grid, width=width, height=height, radius=radius ) for flat_grid in time_series ]
[docs] class UnionFind: """Union-Find data structure for efficient connectivity checks.""" def __init__(self, size): """Initialize Union-Find structure with given size.""" self.parent = np.arange(size)
[docs] def find(self, x): """Find the root of the component containing x with path compression.""" if self.parent[x] != x: self.parent[x] = self.find(self.parent[x]) return self.parent[x]
[docs] def union(self, x, y): """Union the components containing x and y.""" rx = self.find(x) ry = self.find(y) if rx != ry: self.parent[ry] = rx
[docs] def analyze_environment_clusters_periodic( environment: np.ndarray, width: int, height: int, threshold: float = 0.5, diagonal: bool = False, ): """Analyze clusters in a 2D environment with periodic boundaries. Args: environment: 1D array of environment values per agent. width: Grid width. height: Grid height. threshold: Threshold to binarize the environment (default = 0.5) diagonal: If True, use 8-connectivity (diagonal neighbors included). If False, use 4-connectivity (adjacent only). Returns: num_clusters: Number of clusters found cluster_sizes: List of sizes of each cluster labels: 2D array of labeled clusters """ env_grid = environment.reshape((height, width)) binary = env_grid > threshold uf = UnionFind(width * height) def idx(x, y): return (y % height) * width + (x % width) # periodic indexing # Define neighbor offsets neighbors = [(-1, 0), (0, -1), (1, 0), (0, 1)] # 4-connectivity if diagonal: neighbors += [(-1, -1), (1, -1), (-1, 1), (1, 1)] # 8-connectivity for y in range(height): for x in range(width): if not binary[y, x]: continue for dx, dy in neighbors: nx, ny = (x + dx) % width, (y + dy) % height if binary[ny, nx]: uf.union(idx(x, y), idx(nx, ny)) # Count clusters labels = -np.ones((height, width), dtype=int) label_map = {} next_label = 0 for y in range(height): for x in range(width): if binary[y, x]: root = uf.find(idx(x, y)) if root not in label_map: label_map[root] = next_label next_label += 1 labels[y, x] = label_map[root] num_clusters = len(label_map) cluster_sizes = [np.sum(labels == i) for i in range(num_clusters)] return num_clusters, cluster_sizes, labels
[docs] def get_dominant_frequency_and_power( signal: np.ndarray, dt: float = 1.0 ) -> tuple[float, float]: """Calculate the dominant frequency and its power in a time series signal. Args: signal (np.ndarray): 1D array of time series data. dt (float): Time step between samples in seconds. Returns: tuple[float, float]: Dominant frequency (Hz) and its power. """ signal = signal - signal.mean() spectrum = np.abs(rfft(signal)) freqs = rfftfreq(len(signal), d=dt) if len(spectrum) <= 1: return 0.0, 0.0 dominant_idx = np.argmax(spectrum[1:]) + 1 return freqs[dominant_idx], spectrum[dominant_idx] ** 2