from __future__ import annotations from dataclasses import dataclass, field import math import random import threading import time import uuid from typing import Dict, List, Optional, Tuple MAX_RUNS = 25 MAX_ROAD_SEGMENTS = 4000 DEFAULTS = { "width": 120.0, "height": 80.0, "residents": 60, "services": 4, "speed": 1.25, "service_radius": 3.5, "seed": None, "lat": 19.4326, "lon": -99.1332, "dist": 700.0, } @dataclass class Resident: id: int x: float y: float node_id: Optional[int] = None path_nodes: List[int] = field(default_factory=list) path_index: int = 0 edge_progress: float = 0.0 target_node_id: Optional[int] = None @dataclass class Service: id: int x: float y: float node_id: Optional[int] = None class SimulationRun: def __init__( self, run_id: str, width: float, height: float, residents: int, services: int, speed: float, service_radius: float, seed: Optional[int], use_roads: bool = False, road_network: Optional[Dict[str, object]] = None, road_meta: Optional[Dict[str, float]] = None, road_error: Optional[str] = None, ) -> None: road_network = road_network or {} road_nodes = road_network.get("nodes") or {} road_edges = road_network.get("edges") or [] road_width = road_network.get("width") road_height = road_network.get("height") road_bounds = road_network.get("bounds") if use_roads and road_nodes and road_width and road_height: width = float(road_width) height = float(road_height) self.run_id = run_id self.width = width self.height = height self.speed = speed self.service_radius = service_radius self.seed = seed self.use_roads = bool(use_roads and road_nodes) self.road_graph = road_network.get("graph") self.road_nodes = road_nodes self.road_edges = road_edges self.road_meta = road_meta or {} self.road_error = road_error self.road_bounds = road_bounds self.created_at = time.time() self.step_count = 0 self._lock = threading.Lock() rng = random.Random(seed) if self.use_roads: node_ids = list(self.road_nodes.keys()) if not node_ids: self.use_roads = False else: self.residents = [ _resident_on_node(i, rng.choice(node_ids), self.road_nodes) for i in range(residents) ] self.services = [ _service_on_node(i, rng.choice(node_ids), self.road_nodes) for i in range(services) ] if not self.use_roads: self.residents = [ Resident(i, rng.uniform(0, width), rng.uniform(0, height)) for i in range(residents) ] self.services = [ Service(i, rng.uniform(0, width), rng.uniform(0, height)) for i in range(services) ] def step(self, steps: int = 1) -> None: steps = max(1, steps) with self._lock: for _ in range(steps): if self.use_roads: self._step_once_roads() else: self._step_once() self.step_count += 1 def _step_once(self) -> None: for resident in self.residents: target, distance = self._nearest_service(resident) if target is None or distance <= self.service_radius: continue if distance <= 0: continue dx = target.x - resident.x dy = target.y - resident.y step_size = min(self.speed, distance) resident.x += (dx / distance) * step_size resident.y += (dy / distance) * step_size resident.x = _clamp(resident.x, 0.0, self.width) resident.y = _clamp(resident.y, 0.0, self.height) def _step_once_roads(self) -> None: if not self.road_graph or not self.road_nodes: return for resident in self.residents: if resident.node_id is None: continue if not resident.path_nodes or resident.path_index >= len(resident.path_nodes) - 1: self._assign_path(resident) if not resident.path_nodes or resident.path_index >= len(resident.path_nodes) - 1: continue self._advance_along_path(resident, self.speed) def _nearest_service(self, resident: Resident) -> Tuple[Optional[Service], float]: if not self.services: return None, float("inf") nearest = None min_dist = float("inf") for service in self.services: dist = math.hypot(service.x - resident.x, service.y - resident.y) if dist < min_dist: min_dist = dist nearest = service return nearest, min_dist def _assign_path(self, resident: Resident) -> None: if not self.road_graph or resident.node_id is None: return import networkx as nx best_service = None best_length = None for service in self.services: if service.node_id is None: continue try: length = nx.shortest_path_length( self.road_graph, resident.node_id, service.node_id, weight="length" ) except (nx.NetworkXNoPath, nx.NodeNotFound): continue if best_length is None or length < best_length: best_length = length best_service = service.node_id if best_service is None: resident.path_nodes = [] return try: path_nodes = nx.shortest_path( self.road_graph, resident.node_id, best_service, weight="length" ) except (nx.NetworkXNoPath, nx.NodeNotFound): resident.path_nodes = [] return resident.path_nodes = path_nodes resident.path_index = 0 resident.edge_progress = 0.0 resident.target_node_id = best_service def _advance_along_path(self, resident: Resident, distance: float) -> None: remaining = max(0.0, distance) while remaining > 0 and resident.path_index < len(resident.path_nodes) - 1: current_node = resident.path_nodes[resident.path_index] next_node = resident.path_nodes[resident.path_index + 1] p0 = self.road_nodes.get(current_node) p1 = self.road_nodes.get(next_node) if not p0 or not p1: resident.path_index += 1 resident.edge_progress = 0.0 resident.node_id = next_node continue edge_length = math.hypot(p1[0] - p0[0], p1[1] - p0[1]) if edge_length <= 0: resident.path_index += 1 resident.edge_progress = 0.0 resident.node_id = next_node resident.x, resident.y = p1 continue edge_remaining = edge_length - resident.edge_progress if remaining < edge_remaining: resident.edge_progress += remaining ratio = resident.edge_progress / edge_length resident.x = p0[0] + (p1[0] - p0[0]) * ratio resident.y = p0[1] + (p1[1] - p0[1]) * ratio remaining = 0.0 else: remaining -= edge_remaining resident.path_index += 1 resident.edge_progress = 0.0 resident.node_id = next_node resident.x, resident.y = p1 def metrics(self) -> Dict[str, float | int]: if not self.services: return { "served": 0, "avg_distance": 0.0, "max_distance": 0.0, } total_distance = 0.0 max_distance = 0.0 served = 0 for resident in self.residents: _, dist = self._nearest_service(resident) total_distance += dist max_distance = max(max_distance, dist) if dist <= self.service_radius: served += 1 avg_distance = total_distance / max(1, len(self.residents)) return { "served": served, "avg_distance": avg_distance, "max_distance": max_distance, } def state(self, include_roads: bool = True) -> Dict[str, object]: with self._lock: metrics = self.metrics() payload = { "run_id": self.run_id, "step": self.step_count, "world": { "width": self.width, "height": self.height, "mode": "roads" if self.use_roads else "grid", }, "counts": { "residents": len(self.residents), "services": len(self.services), }, "config": { "speed": self.speed, "service_radius": self.service_radius, "seed": self.seed, "use_roads": self.use_roads, }, "metrics": metrics, "residents": [ {"id": agent.id, "x": agent.x, "y": agent.y} for agent in self.residents ], "services": [ {"id": svc.id, "x": svc.x, "y": svc.y} for svc in self.services ], } if self.road_meta: payload["world"]["center"] = { "lat": self.road_meta.get("lat"), "lon": self.road_meta.get("lon"), "dist": self.road_meta.get("dist"), } if self.road_bounds: payload["world"]["bbox"] = self.road_bounds if self.road_error: payload["world"]["road_error"] = self.road_error if include_roads and self.road_edges: payload["roads"] = self.road_edges return payload _RUNS: Dict[str, SimulationRun] = {} _RUN_ORDER: List[str] = [] RUNS_LOCK = threading.Lock() def _resident_on_node(resident_id: int, node_id: int, node_positions: Dict[int, Tuple[float, float]]) -> Resident: x, y = node_positions[node_id] return Resident(resident_id, x, y, node_id=node_id) def _service_on_node(service_id: int, node_id: int, node_positions: Dict[int, Tuple[float, float]]) -> Service: x, y = node_positions[node_id] return Service(service_id, x, y, node_id=node_id) def _clamp(value: float, low: float, high: float) -> float: return max(low, min(high, value)) def _to_int(value: object, default: int) -> int: if value is None: return default try: return int(value) except (TypeError, ValueError): return default def _to_float(value: object, default: float) -> float: if value is None: return default try: return float(value) except (TypeError, ValueError): return default def _to_bool(value: object) -> bool: if isinstance(value, bool): return value if isinstance(value, (int, float)): return value != 0 if isinstance(value, str): return value.strip().lower() in {"1", "true", "yes", "on"} return False def _normalize_config(payload: Dict[str, object]) -> Dict[str, object]: width = _clamp(_to_float(payload.get("width"), DEFAULTS["width"]), 40.0, 500.0) height = _clamp(_to_float(payload.get("height"), DEFAULTS["height"]), 40.0, 500.0) residents = _clamp(_to_int(payload.get("residents"), DEFAULTS["residents"]), 5, 500) services = _clamp(_to_int(payload.get("services"), DEFAULTS["services"]), 1, 50) speed = _clamp(_to_float(payload.get("speed"), DEFAULTS["speed"]), 0.1, 8.0) service_radius = _clamp( _to_float(payload.get("service_radius"), DEFAULTS["service_radius"]), 0.5, 15.0 ) seed_value = payload.get("seed") seed = _to_int(seed_value, None) if seed_value is not None else None return { "width": width, "height": height, "residents": int(residents), "services": int(services), "speed": float(speed), "service_radius": float(service_radius), "seed": seed, } def _normalize_roads(payload: Dict[str, object]) -> Dict[str, float]: lat = _clamp(_to_float(payload.get("lat"), DEFAULTS["lat"]), -85.0, 85.0) lon = _clamp(_to_float(payload.get("lon"), DEFAULTS["lon"]), -180.0, 180.0) dist = _clamp(_to_float(payload.get("dist"), DEFAULTS["dist"]), 200.0, 2500.0) return { "lat": lat, "lon": lon, "dist": dist, } def _build_road_network(lat: float, lon: float, dist: float) -> Dict[str, object]: import osmnx as ox graph = ox.graph_from_point((lat, lon), dist=dist, network_type="drive", simplify=True) if graph is None or len(graph.nodes) == 0: raise ValueError("OSM graph is empty") graph = ox.project_graph(graph, to_crs="EPSG:3857").to_undirected() node_positions_raw: Dict[int, Tuple[float, float]] = {} for node_id, data in graph.nodes(data=True): if "x" in data and "y" in data: node_positions_raw[node_id] = (float(data["x"]), float(data["y"])) if not node_positions_raw: raise ValueError("OSM graph has no coordinates") xs = [pos[0] for pos in node_positions_raw.values()] ys = [pos[1] for pos in node_positions_raw.values()] min_x, max_x = min(xs), max(xs) min_y, max_y = min(ys), max(ys) width = max(max_x - min_x, 1.0) height = max(max_y - min_y, 1.0) node_positions = { node_id: (x - min_x, y - min_y) for node_id, (x, y) in node_positions_raw.items() } edges: List[Dict[str, float]] = [] for u, v, data in graph.edges(data=True): geom = data.get("geometry") if geom is not None: coords = list(geom.coords) for i in range(len(coords) - 1): x1, y1 = coords[i] x2, y2 = coords[i + 1] edges.append({ "x1": x1 - min_x, "y1": y1 - min_y, "x2": x2 - min_x, "y2": y2 - min_y, }) else: if u in node_positions and v in node_positions: x1, y1 = node_positions[u] x2, y2 = node_positions[v] edges.append({ "x1": x1, "y1": y1, "x2": x2, "y2": y2, }) if len(edges) > MAX_ROAD_SEGMENTS: step = max(1, int(len(edges) / MAX_ROAD_SEGMENTS)) edges = edges[::step] return { "graph": graph, "nodes": node_positions, "edges": edges, "width": width, "height": height, "bounds": { "min_x": min_x, "min_y": min_y, "max_x": max_x, "max_y": max_y, }, } def create_run(payload: Optional[Dict[str, object]] = None) -> SimulationRun: payload = payload or {} config = _normalize_config(payload) use_roads = _to_bool(payload.get("use_roads")) or payload.get("mode") == "roads" road_meta = _normalize_roads(payload) if use_roads else {} road_network = None road_error = None if use_roads: try: road_network = _build_road_network( road_meta["lat"], road_meta["lon"], road_meta["dist"] ) except Exception as exc: road_error = f"{type(exc).__name__}: {exc}" road_network = None use_roads = False run_id = uuid.uuid4().hex run = SimulationRun( run_id=run_id, width=config["width"], height=config["height"], residents=config["residents"], services=config["services"], speed=config["speed"], service_radius=config["service_radius"], seed=config["seed"], use_roads=use_roads, road_network=road_network, road_meta=road_meta, road_error=road_error, ) with RUNS_LOCK: _RUNS[run_id] = run _RUN_ORDER.append(run_id) while len(_RUN_ORDER) > MAX_RUNS: oldest = _RUN_ORDER.pop(0) _RUNS.pop(oldest, None) return run def get_run(run_id: str) -> Optional[SimulationRun]: if not run_id: return None with RUNS_LOCK: return _RUNS.get(run_id)