Source code for polyphy.lib.PolyPhy2D

import matplotlib.pyplot as plt
import numpy as np
import taichi as ti
from numpy.random import default_rng


[docs]def run_2D(): """Simulation-wide constants""" N_AGENTS = 1000000 DOMAIN_SCALE = 100.0 TRACE_RESOLUTION = (1024, 1024) DEPOSIT_DOWNSCALING_FACTOR = 4 DEPOSIT_RESOLUTION = (TRACE_RESOLUTION[0] // DEPOSIT_DOWNSCALING_FACTOR, TRACE_RESOLUTION[1] // DEPOSIT_DOWNSCALING_FACTOR) DOMAIN_SIZE = (DOMAIN_SCALE, DOMAIN_SCALE * np.float32(TRACE_RESOLUTION[1]) / np.float32(TRACE_RESOLUTION[0])) VIS_RESOLUTION = TRACE_RESOLUTION # Type aliases FLOAT_CPU = np.float32 INT_CPU = np.int32 FLOAT_GPU = ti.f32 INT_GPU = ti.i32 VEC2i = ti.types.vector(2, INT_GPU) VEC3i = ti.types.vector(2, INT_GPU) VEC2f = ti.types.vector(2, FLOAT_GPU) VEC3f = ti.types.vector(3, FLOAT_GPU) # Initializations ti.init(arch=ti.gpu) rng = default_rng() print('Number of agents:', N_AGENTS) print('Trace grid resolution:', TRACE_RESOLUTION) print('Deposit grid resolution:', DEPOSIT_RESOLUTION) print('Simulation domain size:', DOMAIN_SIZE) # Initialize agents agents = np.zeros(shape=(N_AGENTS, 4), dtype=FLOAT_CPU) agents[:, 0] = rng.uniform(low=0.0, high=DOMAIN_SIZE[0], size=agents.shape[0]) agents[:, 1] = rng.uniform(low=0.0, high=DOMAIN_SIZE[1], size=agents.shape[0]) agents[:, 2] = rng.uniform(low=0.0, high=2.0 * np.pi, size=agents.shape[0]) agents[:, 3] = 1.0 # TODO load input data for fitting # Allocate GPU memory fields agents_field = ti.Vector.field(n=4, dtype=FLOAT_GPU, shape=N_AGENTS) deposit_field = ti.Vector.field(n=2, dtype=FLOAT_GPU, shape=DEPOSIT_RESOLUTION) trace_field = ti.Vector.field(n=1, dtype=FLOAT_GPU, shape=TRACE_RESOLUTION) vis_field = ti.Vector.field(n=3, dtype=FLOAT_GPU, shape=VIS_RESOLUTION) # Define all GPU kernels and functions @ti.kernel def zero_field(f: ti.template()): for cell in ti.grouped(f): f[cell].fill(0.0) return @ti.kernel def copy_field(dst: ti.template(), src: ti.template()): for cell in ti.grouped(dst): dst[cell] = src[cell] return @ti.func def world_to_grid_2D(pos_world, size_world, size_grid) -> VEC2i: return ti.cast((pos_world / size_world) * ti.cast(size_grid, FLOAT_GPU), INT_GPU) @ti.func def angle_to_dir_2D(angle) -> VEC2f: return VEC2f(ti.cos(angle), ti.sin(angle)) @ti.kernel def propagation_step(sense_distance: FLOAT_GPU, sense_angle: FLOAT_GPU, steering_rate: FLOAT_GPU, step_size: FLOAT_GPU, weight_multiplier: FLOAT_GPU): for agent in ti.ndrange(agents_field.shape[0]): pos = VEC2f(0.0, 0.0) pos[0], pos[1], angle, weight = agents_field[agent] dir_fwd = angle_to_dir_2D(angle) angle_mut = angle + (ti.random(dtype=FLOAT_GPU) - 0.5) * sense_angle dir_mut = angle_to_dir_2D(angle_mut) # TODO deposit field ping pong deposit_fwd = deposit_field[world_to_grid_2D(pos + sense_distance * dir_fwd, VEC2f(DOMAIN_SIZE), VEC2i(DEPOSIT_RESOLUTION))][0] deposit_mut = deposit_field[world_to_grid_2D(pos + sense_distance * dir_mut, VEC2f(DOMAIN_SIZE), VEC2i(DEPOSIT_RESOLUTION))][0] # TODO domain wrapping angle_new = (angle) if (deposit_fwd > deposit_mut) else ( steering_rate * angle_mut + (1.0 - steering_rate) * angle) dir_new = angle_to_dir_2D(angle_new) pos_new = pos + step_size * dir_new agents_field[agent][0] = pos_new[0] agents_field[agent][1] = pos_new[1] agents_field[agent][2] = angle_new deposit_cell = world_to_grid_2D(pos_new, VEC2f(DOMAIN_SIZE), VEC2i(DEPOSIT_RESOLUTION)) deposit_field[deposit_cell][0] += weight_multiplier * weight trace_cell = world_to_grid_2D(pos_new, VEC2f(DOMAIN_SIZE), VEC2i(TRACE_RESOLUTION)) trace_field[trace_cell][0] += weight_multiplier * weight return @ti.kernel def relaxation_step_deposit(attenuation: FLOAT_GPU): for cell in ti.grouped(deposit_field): # TODO deposit diffusion deposit_field[cell][0] *= attenuation return @ti.kernel def relaxation_step_trace(attenuation: FLOAT_GPU): for cell in ti.grouped(trace_field): trace_field[cell][0] *= attenuation return @ti.kernel def render_visualization(): for x, y in ti.ndrange(vis_field.shape[0], vis_field.shape[1]): deposit_val = deposit_field[x * DEPOSIT_RESOLUTION[0] // VIS_RESOLUTION[0], y * DEPOSIT_RESOLUTION[1] // VIS_RESOLUTION[1]][0] trace_val = trace_field[x * TRACE_RESOLUTION[0] // VIS_RESOLUTION[0], y * TRACE_RESOLUTION[1] // VIS_RESOLUTION[1]] vis_field[x, y] = VEC3f(trace_val, trace_val, deposit_val if (trace_val < 0.1) else 0.0) return # Initialize GPU fields agents_field.from_numpy(agents) zero_field(deposit_field) zero_field(trace_field) zero_field(vis_field) # Main simulation & vis loop sense_distance = 1.0 sense_angle = 2.5 step_size = 0.1 attenuation = 0.95 weight_multiplier = 0.1 steering_rate = 0.5 window = ti.ui.Window('PolyPhy', (vis_field.shape[0], vis_field.shape[1]), show_window=True) canvas = window.get_canvas() while window.running: window.GUI.begin('Params', 0.0, 0.0, 0.6, 0.25) sense_distance = window.GUI.slider_float('Sense dist', sense_distance, 0.1, 10.0) sense_angle = window.GUI.slider_float('Sense angle', sense_angle, 0.1, 10.0) step_size = window.GUI.slider_float('Step size', step_size, 0.01, 0.5) attenuation = window.GUI.slider_float('Attenuation', attenuation, 0.9, 0.999) weight_multiplier = window.GUI.slider_float('Weight mul', weight_multiplier, 0.01, 1.0) window.GUI.end() propagation_step(sense_distance, sense_angle, steering_rate, step_size, weight_multiplier) relaxation_step_deposit(attenuation) relaxation_step_trace(attenuation) render_visualization() canvas.set_image(vis_field) window.show() window.destroy() deposit = deposit_field.to_numpy() trace = trace_field.to_numpy()
# TODO store resulting fields if __name__ == "__main__": run_2D()