Ekaropolus 0eb2b393f2
All checks were successful
continuous-integration/drone/push Build is passing
SAMI Functionality add
2025-09-16 16:18:45 -06:00

724 lines
29 KiB
Python

# pxy_sites/services/site_scoring.py
from __future__ import annotations
import os, json, uuid, random, math
from typing import List, Tuple, Optional, Dict, Any
from datetime import datetime
from django.conf import settings
from pyproj import Geod
# Headless backend para matplotlib
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.patches import Polygon as MplPolygon
import numpy as np
from shapely.geometry import Point, Polygon
from scipy.stats import gaussian_kde
from pxy_contracts.contracts import (
SiteSearchRequest, SiteSearchResponse,
CandidateSite, ScoreBreakdown
)
from pxy_routing.services import get_routing_provider
from pxy_de.providers.base import get_provider
# --------------------------- Helpers geométricos ---------------------------
def _isochrone_area_km2(feature: dict) -> float:
geom = (feature or {}).get("geometry") or {}
if geom.get("type") != "Polygon":
return 0.0
rings = geom.get("coordinates") or []
if not rings:
return 0.0
coords = rings[0]
if len(coords) < 4:
return 0.0
geod = Geod(ellps="WGS84")
lons = [float(x[0]) for x in coords]
lats = [float(x[1]) for x in coords]
area_m2, _ = geod.polygon_area_perimeter(lons, lats)
return abs(area_m2) / 1_000_000.0 # m² -> km²
def _polygon_from_feature(feature: dict) -> Optional[Polygon]:
geom = (feature or {}).get("geometry") or {}
if geom.get("type") != "Polygon":
return None
coords = geom.get("coordinates")
if not coords or not coords[0]:
return None
try:
ring = [(float(x[0]), float(x[1])) for x in coords[0]]
if len(ring) < 4:
return None
return Polygon(ring)
except Exception:
return None
def _extent_from_iso_list(iso_list: List[dict]) -> Optional[Tuple[float, float, float, float]]:
xs, ys = [], []
for item in iso_list or []:
feat = item.get("feature") or {}
geom = feat.get("geometry") or {}
if geom.get("type") != "Polygon":
continue
coords = geom.get("coordinates") or []
if not coords:
continue
ring = coords[0]
for x, y in ring:
xs.append(float(x)); ys.append(float(y))
if not xs or not ys:
return None
return (min(xs), min(ys), max(xs), max(ys))
def _build_isochrones(center: Tuple[float, float], time_bands: List[int]) -> List[dict]:
"""
Build isochrones for the requested minute bands.
- If the routing provider supports `isochrones(center, minutes_list)`, use it once
(reduces ORS requests and rate-limit pressure).
- Otherwise, fall back to one call per band.
Output schema stays the same as before: a list of dicts with
{"minutes": int, "feature": Feature(Polygon), "area_km2": float}
"""
rp = get_routing_provider()
bands: List[int] = [int(m) for m in (time_bands or [])]
out: List[dict] = []
# Try a single batched call first
if hasattr(rp, "isochrones"):
try:
feats = rp.isochrones(center, bands) # expected same order as requested bands
n = min(len(bands), len(feats))
for m, feat in zip(bands[:n], feats[:n]):
area_km2 = _isochrone_area_km2(feat)
props = {"minutes": int(m), "area_km2": float(area_km2)}
f = {"type": "Feature", "geometry": feat.get("geometry"), "properties": props}
out.append({"minutes": int(m), "feature": f, "area_km2": float(area_km2)})
# If provider returned fewer features than requested, fill the rest via single calls
for m in bands[n:]:
feat = rp.isochrone(center, int(m))
area_km2 = _isochrone_area_km2(feat)
props = {"minutes": int(m), "area_km2": float(area_km2)}
f = {"type": "Feature", "geometry": feat.get("geometry"), "properties": props}
out.append({"minutes": int(m), "feature": f, "area_km2": float(area_km2)})
return out
except Exception:
# Fall back to per-band calls below if the batch call fails for any reason
pass
# Fallback: one request per band (original behavior)
for m in bands:
feat = rp.isochrone(center, int(m))
area_km2 = _isochrone_area_km2(feat)
props = {"minutes": int(m), "area_km2": float(area_km2)}
f = {"type": "Feature", "geometry": feat.get("geometry"), "properties": props}
out.append({"minutes": int(m), "feature": f, "area_km2": float(area_km2)})
return out
def _access_from_iso_list(iso_list: List[dict]) -> Tuple[float, List[str]]:
if not iso_list:
return 0.0, ["no_isochrones"]
areas = [item["area_km2"] for item in iso_list]
max_a = max(areas) if areas else 0.0
if max_a <= 0:
return 0.0, [f"{item['minutes']} min area ≈ 0.0 km²" for item in iso_list]
norms = [a / max_a for a in areas]
access = sum(norms) / len(norms)
reasons = [f"{item['minutes']} min area ≈ {item['area_km2']:.1f} km²" for item in iso_list]
return float(access), reasons
# --------------------------- Scores data-driven ---------------------------
def _competition_from_pois(city: str, business: str, iso_list: List[dict]) -> Tuple[float, List[str]]:
prov = get_provider()
try:
pois = prov.denue(city, business) # DataFrame[name,lat,lon,category]
except Exception as e:
return 0.5, [f"competition_fallback: provider_error={e}"]
if pois.empty or not iso_list:
return 0.5, ["competition_fallback: no_pois_or_isochrones"]
largest = max(iso_list, key=lambda x: x["minutes"])
poly = _polygon_from_feature(largest["feature"])
if poly is None:
return 0.5, ["competition_fallback: invalid_polygon"]
area_km2 = float(largest.get("area_km2") or 0.0)
if area_km2 <= 0.0:
return 0.5, ["competition_fallback: zero_area"]
cnt = 0
for row in pois.itertuples(index=False):
try:
p = Point(float(row.lon), float(row.lat))
if poly.contains(p):
cnt += 1
except Exception:
continue
density = cnt / area_km2 # POIs per km²
D_ref = float(os.getenv("COMP_REF_DENSITY", "5.0"))
comp = 1.0 / (1.0 + density / D_ref)
comp = float(max(0.0, min(1.0, comp)))
reasons = [
f"largest_band: {largest['minutes']} min, area ≈ {area_km2:.1f} km²",
f"competitors_inside: {cnt}, density ≈ {density:.2f} /km²",
f"competition_score = 1/(1 + density/{D_ref:.1f}) ≈ {comp:.2f}",
]
return comp, reasons
def _demand_from_popgrid(city: str, iso_list: List[dict]) -> Tuple[float, List[str]]:
prov = get_provider()
try:
grid = prov.popgrid(city) # DataFrame[cell_id, lat, lon, pop]
except Exception as e:
return 0.5, [f"demand_fallback: provider_error={e}"]
if grid.empty or not iso_list:
return 0.5, ["demand_fallback: no_grid_or_isochrones"]
largest = max(iso_list, key=lambda x: x["minutes"])
poly = _polygon_from_feature(largest["feature"])
if poly is None:
return 0.5, ["demand_fallback: invalid_polygon"]
area_km2 = float(largest.get("area_km2") or 0.0)
if area_km2 <= 0.0:
return 0.5, ["demand_fallback: zero_area"]
total_pop = 0.0
for row in grid.itertuples(index=False):
try:
p = Point(float(row.lon), float(row.lat))
if poly.contains(p):
total_pop += float(row.pop)
except Exception:
continue
density = total_pop / area_km2 if area_km2 > 0 else 0.0
P_ref = float(os.getenv("DEMAND_REF_POP", "50000"))
demand = total_pop / (total_pop + P_ref) if (total_pop + P_ref) > 0 else 0.0
demand = float(max(0.0, min(1.0, demand)))
reasons = [
f"largest_band: {largest['minutes']} min, area ≈ {area_km2:.1f} km²",
f"population_inside ≈ {int(total_pop)}, density ≈ {density:.1f} /km²",
f"demand_score = pop/(pop+{int(P_ref)}) ≈ {demand:.2f}",
]
return demand, reasons
# --------------------------- Sampling y Mapa principal ---------------------------
def _sample_points_in_polygon(poly: Polygon, n: int, rng: random.Random) -> List[Tuple[float, float]]:
minx, miny, maxx, maxy = poly.bounds
pts: List[Tuple[float, float]] = []
max_tries = n * 50
tries = 0
while len(pts) < n and tries < max_tries:
tries += 1
x = rng.uniform(minx, maxx)
y = rng.uniform(miny, maxy)
if poly.contains(Point(x, y)):
pts.append((y, x)) # (lat, lon)
return pts
def _km_per_deg_lon(lat_deg: float) -> float:
return 111.320 * math.cos(math.radians(lat_deg))
def _km_per_deg_lat() -> float:
return 110.574
def _save_sites_map(center: Tuple[float, float], iso_list_for_map: List[dict],
search_id: str, city: str, business: str,
top_candidates: List[Tuple[float, float, float]]) -> str | None:
try:
media_dir = settings.MEDIA_ROOT / "sites"
media_dir.mkdir(parents=True, exist_ok=True)
out_path = media_dir / f"sites_{search_id}.png"
# recolectar polígonos/extent
lons, lats = [center[1]], [center[0]]
polys = []
for item in iso_list_for_map:
feat = item["feature"]
geom = feat.get("geometry") or {}
if geom.get("type") != "Polygon":
continue
coords = geom.get("coordinates")[0]
poly_xy = [(float(x[0]), float(x[1])) for x in coords]
polys.append({"minutes": item["minutes"], "coords": poly_xy, "area": item["area_km2"]})
lons.extend([p[0] for p in poly_xy])
lats.extend([p[1] for p in poly_xy])
fig, ax = plt.subplots(figsize=(7.6, 7.6))
band_palette = ["#2E86AB", "#F18F01", "#C73E1D", "#6C5B7B", "#17B890", "#7E57C2"]
rank_palette = ["#1B998B", "#3A86FF", "#FB5607", "#FFBE0B", "#8338EC", "#FF006E"]
for i, item in enumerate(sorted(polys, key=lambda d: d["minutes"], reverse=True)):
poly = MplPolygon(item["coords"], closed=True,
facecolor=band_palette[i % len(band_palette)], alpha=0.25,
edgecolor=band_palette[i % len(band_palette)], linewidth=1.6,
label=f"{item['minutes']} min · {item['area']:.1f} km²")
ax.add_patch(poly)
ax.scatter([center[1]], [center[0]], s=68, zorder=6,
facecolor="#000", edgecolor="white", linewidth=1.2)
ax.annotate("center", (center[1], center[0]),
xytext=(center[1] + 0.01, center[0] + 0.01),
fontsize=9, color="#303030",
bbox=dict(boxstyle="round,pad=0.2", fc="white", ec="none", alpha=0.75),
arrowprops=dict(arrowstyle="-", lw=0.7, color="#666", alpha=0.9))
sizes = [90, 80, 72, 64, 56, 50, 46, 42, 38, 34]
legend_rows = []
for idx, (lat, lon, score) in enumerate(top_candidates, start=1):
color = rank_palette[(idx - 1) % len(rank_palette)]
size = sizes[idx - 1] if idx - 1 < len(sizes) else 30
ax.scatter([lon], [lat], s=size, zorder=7,
facecolor=color, edgecolor="white", linewidth=1.0)
ax.annotate(f"{idx} · {score:.2f}", (lon, lat),
xytext=(lon + 0.008, lat + 0.008),
fontsize=8, color="#111",
bbox=dict(boxstyle="round,pad=0.2", fc="white", ec="#bbb", alpha=0.9))
legend_rows.append(f"{idx}. ({score:.2f}) {lat:.4f}, {lon:.4f}")
lons.append(lon); lats.append(lat)
if lons and lats:
minx, maxx = min(lons), max(lons)
miny, maxy = min(lats), max(lats)
pad_x = max((maxx - minx) * 0.08, 0.01)
pad_y = max((maxy - miny) * 0.08, 0.01)
ax.set_xlim(minx - pad_x, maxx + pad_x)
ax.set_ylim(miny - pad_y, maxy + pad_y)
ax.set_title(f"Top sites — {business} @ {city}", fontsize=13, pad=10)
ax.set_xlabel("Longitude")
ax.set_ylabel("Latitude")
ax.grid(True, linestyle=":", linewidth=0.6, alpha=0.6)
for spine in ["top", "right"]:
ax.spines[spine].set_visible(False)
leg = ax.legend(loc="lower right", frameon=True, fontsize=8, title="Isochrones")
if leg and leg.get_frame():
leg.get_frame().set_alpha(0.9)
x0, x1 = ax.get_xlim()
y0, y1 = ax.get_ylim()
x_text = x0 + (x1 - x0) * 0.70
y_text = y0 + (y1 - y0) * 0.97
ax.text(x_text, y_text,
"Top-K (score)\n" + "\n".join(legend_rows),
ha="left", va="top", fontsize=8, color="#111",
bbox=dict(boxstyle="round,pad=0.3", fc="white", ec="#ccc", alpha=0.9))
km_per_deg_x = _km_per_deg_lon(center[0])
deg_len = 5.0 / km_per_deg_x if km_per_deg_x > 0 else 0.05
px = x0 + (x1 - x0) * 0.10
py = y0 + (y1 - y0) * 0.08
ax.plot([px, px + deg_len], [py, py], lw=3, color="#222")
ax.plot([px, px], [py - 0.001, py + 0.001], lw=2, color="#222")
ax.plot([px + deg_len, px + deg_len], [py - 0.001, py + 0.001], lw=2, color="#222")
ax.text(px + deg_len / 2.0, py + 0.002, "5 km",
ha="center", va="bottom", fontsize=8, color="#222",
bbox=dict(boxstyle="round,pad=0.2", fc="white", ec="none", alpha=0.7))
fig.tight_layout()
fig.savefig(out_path, dpi=150)
plt.close(fig)
return f"{settings.MEDIA_URL}sites/{out_path.name}"
except Exception:
return None
# --------------------------- Mapas densidad: Demanda / Competencia ---------------------------
def _grid_kde(xy: np.ndarray, weights: Optional[np.ndarray],
x_grid: np.ndarray, y_grid: np.ndarray, bw: Optional[float] = None) -> np.ndarray:
if xy.shape[1] != 2 or xy.shape[0] < 2:
return np.zeros((y_grid.size, x_grid.size), dtype=float)
kde = gaussian_kde(xy.T, weights=weights, bw_method=bw)
Xg, Yg = np.meshgrid(x_grid, y_grid)
pts = np.vstack([Xg.ravel(), Yg.ravel()])
z = kde(pts).reshape(Yg.shape)
z = z - z.min()
if z.max() > 0:
z = z / z.max()
return z
def _render_density_map(kind: str,
center: Tuple[float, float],
iso_list: List[dict],
points_xy: np.ndarray,
weights: Optional[np.ndarray],
search_id: str,
city: str,
business: str) -> Optional[str]:
try:
extent = _extent_from_iso_list(iso_list)
if extent is None:
cx, cy = center[1], center[0]
extent = (cx - 0.08, cy - 0.08, cx + 0.08, cy + 0.08)
minx, miny, maxx, maxy = extent
pad_x = max((maxx - minx) * 0.05, 0.01)
pad_y = max((maxy - miny) * 0.05, 0.01)
minx -= pad_x; maxx += pad_x
miny -= pad_y; maxy += pad_y
lat0 = max(miny, min(maxy, center[0]))
kx = _km_per_deg_lon(lat0)
ky = _km_per_deg_lat()
if points_xy.size == 0:
return None
xs = points_xy[:, 0] * kx
ys = points_xy[:, 1] * ky
grid_n = int(os.getenv("HEAT_GRID_N", "220"))
xg = np.linspace(minx * kx, maxx * kx, grid_n)
yg = np.linspace(miny * ky, maxy * ky, grid_n)
z = _grid_kde(np.c_[xs, ys], weights, xg, yg, bw=None)
media_dir = settings.MEDIA_ROOT / "sites"
media_dir.mkdir(parents=True, exist_ok=True)
out_path = media_dir / f"{kind}_{search_id}.png"
fig, ax = plt.subplots(figsize=(8.0, 7.0))
im = ax.imshow(z, origin="lower",
extent=(minx, maxx, miny, maxy),
interpolation="bilinear", alpha=0.85)
if kind == "demand":
im.set_cmap("YlOrRd")
title = f"Demand heat — {business} @ {city}"
else:
im.set_cmap("GnBu")
title = f"Competition heat — {business} @ {city}"
cs = ax.contour(z, levels=6, linewidths=0.8, alpha=0.8,
extent=(minx, maxx, miny, maxy), colors="k")
ax.clabel(cs, inline=True, fontsize=7, fmt="%.2f")
for item in sorted(iso_list, key=lambda d: d["minutes"], reverse=True):
feat = item.get("feature") or {}
geom = feat.get("geometry") or {}
if geom.get("type") != "Polygon":
continue
coords = geom.get("coordinates")[0]
ring = np.array([(float(x[0]), float(x[1])) for x in coords])
ax.plot(ring[:, 0], ring[:, 1], lw=1.2, alpha=0.9)
ax.scatter([center[1]], [center[0]], s=55, zorder=5,
facecolor="#000", edgecolor="white", linewidth=1.0)
ax.set_title(title, fontsize=13, pad=10)
ax.set_xlabel("Longitude")
ax.set_ylabel("Latitude")
ax.grid(True, linestyle=":", linewidth=0.5, alpha=0.5)
for spine in ["top", "right"]:
ax.spines[spine].set_visible(False)
cbar = plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
cbar.ax.set_ylabel("relative intensity", rotation=90, labelpad=8)
fig.tight_layout()
fig.savefig(out_path, dpi=150)
plt.close(fig)
return f"{settings.MEDIA_URL}sites/{out_path.name}"
except Exception:
return None
def _render_demand_map(center: Tuple[float, float], iso_list: List[dict],
city: str, search_id: str, business: str) -> Optional[str]:
prov = get_provider()
try:
grid = prov.popgrid(city) # cell_id, lat, lon, pop
except Exception:
return None
if grid.empty:
return None
pts = grid[["lon", "lat", "pop"]].dropna().copy()
points_xy = pts[["lon", "lat"]].to_numpy(dtype=float)
weights = pts["pop"].to_numpy(dtype=float)
return _render_density_map("demand", center, iso_list, points_xy, weights, search_id, city, business)
def _render_competition_map(center: Tuple[float, float], iso_list: List[dict],
city: str, business: str, search_id: str) -> Optional[str]:
prov = get_provider()
try:
pois = prov.denue(city, business) # name, lat, lon, category
except Exception:
return None
if pois.empty:
return None
pts = pois[["lon", "lat"]].dropna().copy()
points_xy = pts.to_numpy(dtype=float)
return _render_density_map("competition", center, iso_list, points_xy, None, search_id, city, business)
# --------------------------- Artefacto GeoJSON por búsqueda ---------------------------
def _fc(features: List[Dict[str, Any]]) -> Dict[str, Any]:
return {"type": "FeatureCollection", "features": features}
def _candidates_fc(center: Tuple[float,float],
top: List[Tuple[float,float,float,ScoreBreakdown,List[str],List[dict]]]) -> Dict[str, Any]:
feats = []
for idx, (lat, lon, score, br, _reasons, _iso) in enumerate(top, start=1):
feats.append({
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [float(lon), float(lat)]},
"properties": {
"rank": idx,
"score": float(score),
"access": float(br.access),
"demand": float(br.demand),
"competition": float(br.competition),
"is_center": abs(lat - center[0]) < 1e-9 and abs(lon - center[1]) < 1e-9,
}
})
return _fc(feats)
def _isochrones_fc(iso_list: List[dict]) -> Dict[str, Any]:
feats = []
for item in iso_list:
f = item["feature"]
# ya tiene properties {"minutes","area_km2"}
feats.append(f)
return _fc(feats)
def _pois_fc(pois_df, poly: Polygon) -> Dict[str, Any]:
feats = []
if pois_df is None or pois_df.empty:
return _fc(feats)
count = 0
for row in pois_df.itertuples(index=False):
try:
lon = float(row.lon); lat = float(row.lat)
if not poly.contains(Point(lon, lat)):
continue
feats.append({
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [lon, lat]},
"properties": {
"name": getattr(row, "name", None),
"category": getattr(row, "category", None),
}
})
count += 1
if count >= int(os.getenv("MAX_POIS_GEOJSON", "1000")):
break
except Exception:
continue
return _fc(feats)
def _popgrid_fc(grid_df, poly: Polygon) -> Dict[str, Any]:
feats = []
if grid_df is None or grid_df.empty:
return _fc(feats)
# filtra dentro del polígono
inside = []
for row in grid_df.itertuples(index=False):
try:
lon = float(row.lon); lat = float(row.lat); pop = float(row.pop)
if poly.contains(Point(lon, lat)):
inside.append((lon, lat, pop))
except Exception:
continue
if not inside:
return _fc(feats)
# ordena por población desc y limita
inside.sort(key=lambda t: t[2], reverse=True)
cap = int(os.getenv("MAX_POPGRID_GEOJSON", "800"))
inside = inside[:cap]
for lon, lat, pop in inside:
feats.append({
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [lon, lat]},
"properties": {"pop": pop}
})
return _fc(feats)
def _save_run_artifact(search_id: str,
req: SiteSearchRequest,
chosen_center: Tuple[float,float],
top: List[Tuple[float,float,float,ScoreBreakdown,List[str],List[dict]]],
iso_list: List[dict]) -> Optional[str]:
"""
Guarda un JSON con:
- request_summary
- candidates_fc
- isochrones_fc
- pois_competition_fc
- popgrid_fc (muestra)
"""
try:
media_dir = settings.MEDIA_ROOT / "sites"
media_dir.mkdir(parents=True, exist_ok=True)
out_path = media_dir / f"run_{search_id}.json"
# polígono mayor para recortes
largest = max(iso_list, key=lambda x: x["minutes"]) if iso_list else None
poly = _polygon_from_feature(largest["feature"]) if largest else None
prov = get_provider()
try:
pois = prov.denue(req.city, req.business)
except Exception:
pois = None
try:
grid = prov.popgrid(req.city)
except Exception:
grid = None
artifact = {
"version": "sites-artifact-1",
"created_at": datetime.utcnow().isoformat() + "Z",
"request": req.model_dump(),
"center": {"lat": chosen_center[0], "lon": chosen_center[1]},
"candidates_fc": _candidates_fc(chosen_center, top),
"isochrones_fc": _isochrones_fc(iso_list),
"pois_competition_fc": _pois_fc(pois, poly) if poly is not None else _fc([]),
"popgrid_fc": _popgrid_fc(grid, poly) if poly is not None else _fc([]),
}
with open(out_path, "w", encoding="utf-8") as f:
json.dump(artifact, f, ensure_ascii=False)
return str(out_path)
except Exception:
return None
# --------------------------- Estimador principal ---------------------------
def run_site_search(req: SiteSearchRequest) -> SiteSearchResponse:
search_id = str(uuid.uuid4())
warnings: List[str] = []
candidates: List[CandidateSite] = []
map_url: str | None = None
demand_map_url: Optional[str] = None
competition_map_url: Optional[str] = None
w_access = float(os.getenv("WEIGHT_ACCESS", "0.35"))
w_demand = float(os.getenv("WEIGHT_DEMAND", "0.40"))
w_comp = float(os.getenv("WEIGHT_COMP", "0.25"))
if req.center:
center = (float(req.center[0]), float(req.center[1]))
base_iso = _build_isochrones(center, req.time_bands or [])
largest = max(base_iso, key=lambda x: x["minutes"]) if base_iso else None
poly = _polygon_from_feature(largest["feature"]) if largest else None
if poly is None:
access, access_r = _access_from_iso_list(base_iso)
comp, comp_r = _competition_from_pois(req.city, req.business, base_iso)
dem, dem_r = _demand_from_popgrid(req.city, base_iso)
score = w_access * access + w_demand * dem + w_comp * comp
score = float(max(0.0, min(1.0, score)))
breakdown = ScoreBreakdown(demand=dem, competition=comp, access=access)
reasons = (["Access from isochrone areas (normalized avg)"] + access_r +
["Competition from POI density (largest band)"] + comp_r +
["Demand from population grid (largest band)"] + dem_r)
candidates.append(CandidateSite(lat=center[0], lon=center[1], score=score,
breakdown=breakdown, reasons=reasons))
map_url = _save_sites_map(center, base_iso, search_id, req.city, req.business,
[(center[0], center[1], score)])
warnings.append("sampling_fallback_invalid_polygon")
demand_map_url = _render_demand_map(center, base_iso, req.city, search_id, req.business)
competition_map_url = _render_competition_map(center, base_iso, req.city, req.business, search_id)
# artefacto (solo center)
_save_run_artifact(
search_id, req, center,
[(center[0], center[1], score, breakdown, reasons, base_iso)],
base_iso
)
else:
rng = random.Random(int(search_id.replace("-", ""), 16) & 0xFFFFFFFF)
samples = _sample_points_in_polygon(poly, int(req.num_samples), rng)
cand_points: List[Tuple[float, float]] = [center] + samples
scored: List[Tuple[float, float, float, ScoreBreakdown, List[str], List[dict]]] = []
for (lat, lon) in cand_points:
iso_list = _build_isochrones((lat, lon), req.time_bands or [])
access, access_r = _access_from_iso_list(iso_list)
comp, comp_r = _competition_from_pois(req.city, req.business, iso_list)
dem, dem_r = _demand_from_popgrid(req.city, iso_list)
score = w_access * access + w_demand * dem + w_comp * comp
score = float(max(0.0, min(1.0, score)))
breakdown = ScoreBreakdown(demand=dem, competition=comp, access=access)
reasons = (["Access from isochrone areas (normalized avg)"] + access_r +
["Competition from POI density (largest band)"] + comp_r +
["Demand from population grid (largest band)"] + dem_r)
scored.append((lat, lon, score, breakdown, reasons, iso_list))
scored.sort(key=lambda t: t[2], reverse=True)
top = scored[: max(1, int(req.max_candidates))]
for (lat, lon, score, breakdown, reasons, _iso) in top:
candidates.append(CandidateSite(
lat=lat, lon=lon, score=score, breakdown=breakdown, reasons=reasons
))
top1_iso = top[0][5]
top_points = [(lat, lon, score) for (lat, lon, score, *_rest) in top]
map_url = _save_sites_map((top[0][0], top[0][1]), top1_iso, search_id,
req.city, req.business, top_points)
warnings.append("multi_candidate_sampling_ok")
demand_map_url = _render_demand_map((top[0][0], top[0][1]), top1_iso, req.city, search_id, req.business)
competition_map_url = _render_competition_map((top[0][0], top[0][1]), top1_iso, req.city, req.business, search_id)
if demand_map_url: warnings.append("demand_map_saved")
else: warnings.append("demand_map_failed")
if competition_map_url: warnings.append("competition_map_saved")
else: warnings.append("competition_map_failed")
# artefacto (Top-K + isócronas del Top-1)
_save_run_artifact(search_id, req, (top[0][0], top[0][1]), top, top1_iso)
else:
neutral = ScoreBreakdown(demand=0.5, competition=0.5, access=0.5)
for i in range(req.max_candidates):
candidates.append(CandidateSite(
lat=0.0, lon=0.0, score=0.5,
breakdown=neutral,
reasons=[f"stub candidate #{i+1} for {req.business} in {req.city}"],
))
warnings.append("no_center_provided_stub_output")
return SiteSearchResponse(
search_id=search_id,
city=req.city,
business=req.business,
time_bands=req.time_bands,
candidates=candidates,
map_url=map_url,
demand_map_url=demand_map_url,
competition_map_url=competition_map_url,
data_release=req.data_release,
warnings=warnings,
)