from __future__ import annotations from datetime import datetime, timedelta, timezone import math import random from typing import Dict, List, Optional, Tuple from pxy_city_digital_twins.services.waste_routes import get_dispatch_data_for WASTE_TYPES = ("organic", "recyclable", "landfill") BIN_TYPE_CAPACITY = { "organic": 8.0, "recyclable": 6.0, "landfill": 10.0, } TRUCK_CAPACITY = { "organic": 180.0, "recyclable": 140.0, "landfill": 220.0, } DEFAULTS = { "truck_speed_kmh": 25.0, "bins_per_stop": 3, "bin_radius_m": 60.0, "walk_speed_mps": 1.2, "window_min_s": 900.0, "window_max_s": 2400.0, "seed": None, "route_point_limit": 800, } STATUS_COLORS = { "served": [80, 200, 120, 220], "missed_time": [255, 90, 90, 220], "missed_capacity": [255, 170, 60, 220], } def _to_int(value: object, default: int) -> int: if value is None: return default try: return int(value) except (TypeError, ValueError): return default def _to_float(value: object, default: float) -> float: if value is None: return default try: return float(value) except (TypeError, ValueError): return default def _clamp(value: float, low: float, high: float) -> float: return max(low, min(high, value)) def _normalize_params(params: Dict[str, object]) -> Dict[str, object]: truck_speed_kmh = _clamp(_to_float(params.get("truck_speed_kmh"), DEFAULTS["truck_speed_kmh"]), 5.0, 60.0) bins_per_stop = _clamp(_to_int(params.get("bins_per_stop"), DEFAULTS["bins_per_stop"]), 1, 10) bin_radius_m = _clamp(_to_float(params.get("bin_radius_m"), DEFAULTS["bin_radius_m"]), 10.0, 250.0) walk_speed_mps = _clamp(_to_float(params.get("walk_speed_mps"), DEFAULTS["walk_speed_mps"]), 0.5, 3.0) window_min_s = _clamp(_to_float(params.get("window_min_s"), DEFAULTS["window_min_s"]), 300.0, 3600.0) window_max_s = _clamp(_to_float(params.get("window_max_s"), DEFAULTS["window_max_s"]), window_min_s, 7200.0) seed_value = params.get("seed") seed = _to_int(seed_value, None) if seed_value is not None else None route_point_limit = _clamp(_to_int(params.get("route_point_limit"), DEFAULTS["route_point_limit"]), 200, 3000) return { "truck_speed_kmh": truck_speed_kmh, "bins_per_stop": int(bins_per_stop), "bin_radius_m": float(bin_radius_m), "walk_speed_mps": float(walk_speed_mps), "window_min_s": float(window_min_s), "window_max_s": float(window_max_s), "seed": seed, "route_point_limit": int(route_point_limit), } def _haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: r = 6371000.0 dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = math.sin(dlat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon / 2) ** 2 c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) return r * c def _lonlat_to_mercator(lon: float, lat: float) -> Tuple[float, float]: r = 6378137.0 x = r * math.radians(lon) y = r * math.log(math.tan(math.pi / 4 + math.radians(lat) / 2)) return x, y def _mercator_to_lonlat(x: float, y: float) -> Tuple[float, float]: r = 6378137.0 lon = math.degrees(x / r) lat = math.degrees(2 * math.atan(math.exp(y / r)) - math.pi / 2) return lon, lat def _offset_lonlat(lon: float, lat: float, dx: float, dy: float) -> Tuple[float, float]: x, y = _lonlat_to_mercator(lon, lat) x += dx y += dy return _mercator_to_lonlat(x, y) def _nearest_index(lon: float, lat: float, coords: List[Tuple[float, float]]) -> int: best_idx = 0 best_dist = None for idx, (clon, clat) in enumerate(coords): dist = _haversine_m(lat, lon, clat, clon) if best_dist is None or dist < best_dist: best_dist = dist best_idx = idx return best_idx def _downsample_coords(coords: List[Tuple[float, float]], limit: int) -> List[Tuple[float, float]]: if len(coords) <= limit: return coords step = max(1, int(len(coords) / limit)) sampled = coords[::step] if sampled[-1] != coords[-1]: sampled.append(coords[-1]) return sampled def build_waste_cesium_payload(subdivision: str, vehicle: str, params: Optional[Dict[str, object]] = None) -> Dict[str, object]: params = params or {} config = _normalize_params(params) rng = random.Random(config["seed"]) dispatch_data = get_dispatch_data_for(subdivision) if not dispatch_data: raise ValueError("No dispatch data found for subdivision") route = next((r for r in dispatch_data if str(r.get("route_id")) == str(vehicle)), None) if not route: raise ValueError("Route not found for vehicle") coords_raw = route.get("coords") or [] steps = route.get("steps") or [] if not coords_raw and steps: coords_raw = [step.get("position") for step in steps if step.get("position")] if not coords_raw or len(coords_raw) < 2: raise ValueError("Route geometry is missing") coords_full = [(float(lon), float(lat)) for lon, lat in coords_raw] cumulative = [0.0] for i in range(len(coords_full) - 1): lon1, lat1 = coords_full[i] lon2, lat2 = coords_full[i + 1] cumulative.append(cumulative[-1] + _haversine_m(lat1, lon1, lat2, lon2)) total_distance = cumulative[-1] truck_speed_mps = config["truck_speed_kmh"] / 3.6 if truck_speed_mps <= 0 or total_distance <= 0: raise ValueError("Route distance or speed is invalid") total_time_s = total_distance / truck_speed_mps start_time = datetime.now(timezone.utc) end_time = start_time + timedelta(seconds=total_time_s) coords_czml = _downsample_coords(coords_full, config["route_point_limit"]) indices_lookup = {coords_full[idx]: idx for idx in range(len(coords_full))} positions = [] for lon, lat in coords_czml: idx = indices_lookup.get((lon, lat)) if idx is None: idx = _nearest_index(lon, lat, coords_full) t = cumulative[idx] / truck_speed_mps positions.extend([t, lon, lat, 0]) stop_positions = [] if steps: for step in steps: pos = step.get("position") if not pos: continue stop_positions.append((float(pos[0]), float(pos[1]), step.get("step_type") or "job", step.get("popup") or "")) else: stride = max(1, int(len(coords_full) / 12)) for idx in range(0, len(coords_full), stride): lon, lat = coords_full[idx] stop_positions.append((lon, lat, "job", "")) stop_info = [] for stop_idx, (lon, lat, step_type, popup) in enumerate(stop_positions): coord_idx = _nearest_index(lon, lat, coords_full) arrival_s = cumulative[coord_idx] / truck_speed_mps stop_info.append({ "id": f"stop-{stop_idx}", "lon": lon, "lat": lat, "arrival_s": arrival_s, "step_type": step_type, "popup": popup, }) bins = [] for stop in stop_info: for _ in range(config["bins_per_stop"]): angle = rng.uniform(0, math.pi * 2) radius = rng.uniform(5.0, config["bin_radius_m"]) dx = math.cos(angle) * radius dy = math.sin(angle) * radius lon, lat = _offset_lonlat(stop["lon"], stop["lat"], dx, dy) ready_time = rng.uniform(0.0, total_time_s * 0.8) window_s = rng.uniform(config["window_min_s"], config["window_max_s"]) walk_time = radius / config["walk_speed_mps"] waste = {} for wtype in WASTE_TYPES: fullness = rng.uniform(0.2, 1.0) waste[wtype] = round(fullness * BIN_TYPE_CAPACITY[wtype], 2) arrival_s = stop["arrival_s"] on_time = arrival_s >= (ready_time + walk_time) and arrival_s <= (ready_time + window_s) status = "served" if on_time else "missed_time" bins.append({ "id": f"bin-{len(bins)}", "lon": lon, "lat": lat, "arrival_s": arrival_s, "ready_s": ready_time, "window_s": window_s, "walk_s": walk_time, "distance_m": radius, "status": status, "waste": waste, }) remaining = TRUCK_CAPACITY.copy() bins_sorted = sorted(bins, key=lambda b: b["arrival_s"]) for b in bins_sorted: if b["status"] != "served": continue fits = True for wtype, amount in b["waste"].items(): if amount > remaining[wtype]: fits = False break if fits: for wtype, amount in b["waste"].items(): remaining[wtype] -= amount else: b["status"] = "missed_capacity" total_bins = len(bins) served = sum(1 for b in bins if b["status"] == "served") missed_time = sum(1 for b in bins if b["status"] == "missed_time") missed_capacity = sum(1 for b in bins if b["status"] == "missed_capacity") waste_total = {wtype: 0.0 for wtype in WASTE_TYPES} waste_collected = {wtype: 0.0 for wtype in WASTE_TYPES} for b in bins: for wtype, amount in b["waste"].items(): waste_total[wtype] += amount if b["status"] == "served": waste_collected[wtype] += amount metrics = { "bins_total": total_bins, "bins_served": served, "bins_missed_time": missed_time, "bins_missed_capacity": missed_capacity, "waste_total": waste_total, "waste_collected": waste_collected, "route_km": round(total_distance / 1000.0, 2), "route_minutes": round(total_time_s / 60.0, 1), } interval = f"{start_time.isoformat()}/{end_time.isoformat()}" czml = [ { "id": "document", "name": "Waste Simulation", "version": "1.0", "clock": { "interval": interval, "currentTime": start_time.isoformat(), "multiplier": 10, "range": "LOOP_STOP", "step": "SYSTEM_CLOCK_MULTIPLIER", }, }, { "id": "truck", "availability": interval, "position": { "epoch": start_time.isoformat(), "cartographicDegrees": positions, }, "point": { "pixelSize": 10, "color": {"rgba": [255, 80, 80, 255]}, "outlineColor": {"rgba": [20, 20, 20, 255]}, "outlineWidth": 1, }, "path": { "material": { "polylineOutline": { "color": {"rgba": [255, 80, 80, 160]}, "outlineColor": {"rgba": [0, 0, 0, 40]}, "outlineWidth": 1, } }, "width": 3, "leadTime": 0, "trailTime": total_time_s, "resolution": 5, }, }, ] for stop in stop_info: czml.append({ "id": stop["id"], "position": {"cartographicDegrees": [stop["lon"], stop["lat"], 0]}, "point": { "pixelSize": 6, "color": {"rgba": [60, 160, 255, 200]}, }, "properties": { "arrival_s": stop["arrival_s"], "step_type": stop["step_type"], }, }) for b in bins: color = STATUS_COLORS.get(b["status"], [200, 200, 200, 200]) desc = ( f"
| Status | {b['status']} |
|---|---|
| Arrival | {b['arrival_s'] / 60.0:.1f} min |
| Ready | {b['ready_s'] / 60.0:.1f} min |
| Walk | {b['walk_s'] / 60.0:.1f} min |
| Organic | {b['waste']['organic']} |
| Recyclable | {b['waste']['recyclable']} |
| Landfill | {b['waste']['landfill']} |