Ekaropolus c8034ee5d2
Some checks are pending
continuous-integration/drone/push Build is running
Digital Twins ver 2.0 with Celium and Roads
2026-01-03 01:03:22 -06:00

270 lines
8.6 KiB
Python

import osmnx as ox
import shapely
import random
import uuid
import networkx as nx
from matplotlib import cm
import math
from shapely.geometry import LineString, MultiLineString
def generate_osm_city_data(lat, lon, dist=400, scale=1.0):
print(f"🏙️ Fetching OSM buildings and network at ({lat}, {lon})")
scale_factor = scale
status_options = ["OK", "Warning", "Critical", "Offline"]
road_types = {
"motorway",
"trunk",
"primary",
"secondary",
"tertiary",
"residential",
"unclassified",
"service",
"living_street",
}
road_widths = {
"motorway": 8.0,
"trunk": 7.0,
"primary": 6.0,
"secondary": 5.0,
"tertiary": 4.0,
"residential": 3.0,
}
road_colors = {
"motorway": "#00ffff",
"trunk": "#00ff99",
"primary": "#ff00ff",
"secondary": "#ff8800",
"tertiary": "#ffe600",
"residential": "#00aaff",
"unclassified": "#66ffcc",
"service": "#ff66cc",
"living_street": "#66ff66",
}
default_road_width = 3.0
min_segment_len = 1.0
# ————— STREET NETWORK —————
G = ox.graph_from_point((lat, lon), dist=dist, network_type='drive').to_undirected()
degree = dict(G.degree())
max_degree = max(degree.values()) if degree else 1
color_map = cm.get_cmap("plasma")
nodes_gdf, edge_gdf = ox.graph_to_gdfs(G, nodes=True, edges=True)
nodes_gdf = nodes_gdf.to_crs(epsg=3857)
edge_gdf = edge_gdf.to_crs(epsg=3857)
# ————— BUILDINGS —————
tags = {"building": True}
gdf = ox.features_from_point((lat, lon), tags=tags, dist=dist)
gdf = gdf[gdf.geometry.type.isin(["Polygon", "MultiPolygon"])].to_crs(epsg=3857)
gdf["centroid"] = gdf.geometry.centroid
raw_buildings = []
for i, row in gdf.iterrows():
centroid = row["centroid"]
polygon = row["geometry"]
building_id = f"BLD-{uuid.uuid4().hex[:6].upper()}"
status = random.choice(status_options)
try:
height = float(row.get("height", None))
except:
height = float(row.get("building:levels", 3)) * 3.2 if row.get("building:levels") else 10.0
try:
node = ox.distance.nearest_nodes(G, X=centroid.x, Y=centroid.y)
node_degree = degree.get(node, 0)
except:
node_degree = 0
norm_value = node_degree / max_degree
rgba = color_map(norm_value)
hex_color = '#%02x%02x%02x' % tuple(int(c * 255) for c in rgba[:3])
raw_buildings.append({
"id": building_id,
"raw_x": centroid.x,
"raw_z": centroid.y,
"width": polygon.bounds[2] - polygon.bounds[0],
"depth": polygon.bounds[3] - polygon.bounds[1],
"height": height,
"color": hex_color,
"status": status,
})
raw_roads = []
raw_road_paths = []
for _, row in edge_gdf.iterrows():
hwy = row.get("highway")
if isinstance(hwy, (list, tuple)) and hwy:
hwy = hwy[0]
if hwy and hwy not in road_types:
continue
geom = row.get("geometry")
lines = []
if isinstance(geom, LineString):
lines = [geom]
elif isinstance(geom, MultiLineString):
lines = list(geom.geoms)
else:
u = row.get("u")
v = row.get("v")
if u is not None and v is not None and u in nodes_gdf.index and v in nodes_gdf.index:
nu = nodes_gdf.loc[u].geometry
nv = nodes_gdf.loc[v].geometry
if nu is not None and nv is not None:
lines = [LineString([(nu.x, nu.y), (nv.x, nv.y)])]
if not lines:
continue
width = road_widths.get(hwy, default_road_width)
color = road_colors.get(hwy, "#666666")
for line in lines:
coords = list(line.coords)
if len(coords) >= 2:
raw_road_paths.append({
"coords": coords,
"width": width,
"color": color,
})
for i in range(len(coords) - 1):
x1, z1 = coords[i]
x2, z2 = coords[i + 1]
raw_roads.append({
"raw_x1": x1,
"raw_z1": z1,
"raw_x2": x2,
"raw_z2": z2,
"width": width,
"color": color,
})
# ————— CENTER AND SCALE —————
if raw_buildings:
avg_x = sum(b['raw_x'] for b in raw_buildings) / len(raw_buildings)
avg_z = sum(b['raw_z'] for b in raw_buildings) / len(raw_buildings)
else:
avg_x = 0.0
avg_z = 0.0
if not raw_buildings and raw_roads:
avg_x = sum((r["raw_x1"] + r["raw_x2"]) / 2 for r in raw_roads) / len(raw_roads)
avg_z = sum((r["raw_z1"] + r["raw_z2"]) / 2 for r in raw_roads) / len(raw_roads)
if raw_buildings:
buildings = [{
"id": b['id'],
"position_x": (b['raw_x'] - avg_x) * scale_factor,
"position_z": (b['raw_z'] - avg_z) * scale_factor,
"width": b['width'] * scale_factor,
"depth": b['depth'] * scale_factor,
"height": b['height'] * scale_factor,
"color": b['color'],
"status": b['status'],
} for b in raw_buildings]
else:
buildings = []
roads = []
for r in raw_roads:
x1 = (r["raw_x1"] - avg_x) * scale_factor
z1 = (r["raw_z1"] - avg_z) * scale_factor
x2 = (r["raw_x2"] - avg_x) * scale_factor
z2 = (r["raw_z2"] - avg_z) * scale_factor
dx = x2 - x1
dz = z2 - z1
length = math.hypot(dx, dz)
if length < min_segment_len * scale_factor:
continue
yaw = math.degrees(math.atan2(dz, dx))
roads.append({
"x": (x1 + x2) / 2,
"z": (z1 + z2) / 2,
"x1": x1,
"z1": z1,
"x2": x2,
"z2": z2,
"length": length,
"width": max(r["width"] * scale_factor, 0.2),
"yaw": yaw,
"color": r["color"],
})
road_paths = []
for rp in raw_road_paths:
pts = [{
"x": (x - avg_x) * scale_factor,
"z": (z - avg_z) * scale_factor,
} for x, z in rp["coords"]]
if len(pts) < 2:
continue
if len(pts) > 200:
last_pt = pts[-1]
step = max(1, int(len(pts) / 200))
pts = pts[::step]
if pts[-1] != last_pt:
pts.append(last_pt)
road_paths.append({
"points": pts,
"width": max(rp["width"] * scale_factor, 0.2),
"color": rp["color"],
})
if roads:
xs = [r["x"] for r in roads]
zs = [r["z"] for r in roads]
print(
"🛣️ Roads: segments=%d bbox=(%.1f, %.1f)-(%.1f, %.1f)"
% (len(roads), min(xs), min(zs), max(xs), max(zs))
)
return {"buildings": buildings, "roads": roads, "road_paths": road_paths}
def generate_osm_road_midpoints_only(lat, lon, dist=400, scale=1.0):
import osmnx as ox
import networkx as nx
from shapely.geometry import LineString
import geopandas as gpd
import random
print(f"🛣️ Fetching road midpoints only at ({lat}, {lon})")
# Obtener grafo y convertir a GeoDataFrame con geometría
G = ox.graph_from_point((lat, lon), dist=dist, network_type='drive').to_undirected()
edge_gdf = ox.graph_to_gdfs(G, nodes=False, edges=True)
# Proyectar a metros (3857)
edge_gdf = edge_gdf.to_crs(epsg=3857)
midpoints_raw = []
for _, row in edge_gdf.iterrows():
geom = row.get('geometry', None)
if isinstance(geom, LineString) and geom.length > 0:
midpoint = geom.interpolate(0.5, normalized=True)
midpoints_raw.append((midpoint.x, midpoint.y))
if not midpoints_raw:
return {"roads": []}
avg_x = sum(x for x, _ in midpoints_raw) / len(midpoints_raw)
avg_z = sum(z for _, z in midpoints_raw) / len(midpoints_raw)
midpoints = [{
"id": f"RD-{i}",
"position_x": (x - avg_x) * scale,
"position_z": (z - avg_z) * scale,
"status": random.choice(["OK", "Warning", "Critical", "Offline"])
} for i, (x, z) in enumerate(midpoints_raw)]
# DEBUG
for i in range(min(5, len(midpoints))):
print(f"🧭 Road {i}: x={midpoints[i]['position_x']:.2f}, z={midpoints[i]['position_z']:.2f}")
return {"roads": midpoints}