724 lines
29 KiB
Python
724 lines
29 KiB
Python
# pxy_sites/services/site_scoring.py
|
|
from __future__ import annotations
|
|
import os, json, uuid, random, math
|
|
from typing import List, Tuple, Optional, Dict, Any
|
|
from datetime import datetime
|
|
|
|
from django.conf import settings
|
|
from pyproj import Geod
|
|
|
|
# Headless backend para matplotlib
|
|
import matplotlib
|
|
matplotlib.use("Agg")
|
|
import matplotlib.pyplot as plt
|
|
from matplotlib.patches import Polygon as MplPolygon
|
|
|
|
import numpy as np
|
|
from shapely.geometry import Point, Polygon
|
|
from scipy.stats import gaussian_kde
|
|
|
|
from pxy_contracts.contracts import (
|
|
SiteSearchRequest, SiteSearchResponse,
|
|
CandidateSite, ScoreBreakdown
|
|
)
|
|
from pxy_routing.services import get_routing_provider
|
|
from pxy_de.providers.base import get_provider
|
|
|
|
|
|
# --------------------------- Helpers geométricos ---------------------------
|
|
|
|
def _isochrone_area_km2(feature: dict) -> float:
|
|
geom = (feature or {}).get("geometry") or {}
|
|
if geom.get("type") != "Polygon":
|
|
return 0.0
|
|
rings = geom.get("coordinates") or []
|
|
if not rings:
|
|
return 0.0
|
|
coords = rings[0]
|
|
if len(coords) < 4:
|
|
return 0.0
|
|
geod = Geod(ellps="WGS84")
|
|
lons = [float(x[0]) for x in coords]
|
|
lats = [float(x[1]) for x in coords]
|
|
area_m2, _ = geod.polygon_area_perimeter(lons, lats)
|
|
return abs(area_m2) / 1_000_000.0 # m² -> km²
|
|
|
|
|
|
def _polygon_from_feature(feature: dict) -> Optional[Polygon]:
|
|
geom = (feature or {}).get("geometry") or {}
|
|
if geom.get("type") != "Polygon":
|
|
return None
|
|
coords = geom.get("coordinates")
|
|
if not coords or not coords[0]:
|
|
return None
|
|
try:
|
|
ring = [(float(x[0]), float(x[1])) for x in coords[0]]
|
|
if len(ring) < 4:
|
|
return None
|
|
return Polygon(ring)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _extent_from_iso_list(iso_list: List[dict]) -> Optional[Tuple[float, float, float, float]]:
|
|
xs, ys = [], []
|
|
for item in iso_list or []:
|
|
feat = item.get("feature") or {}
|
|
geom = feat.get("geometry") or {}
|
|
if geom.get("type") != "Polygon":
|
|
continue
|
|
coords = geom.get("coordinates") or []
|
|
if not coords:
|
|
continue
|
|
ring = coords[0]
|
|
for x, y in ring:
|
|
xs.append(float(x)); ys.append(float(y))
|
|
if not xs or not ys:
|
|
return None
|
|
return (min(xs), min(ys), max(xs), max(ys))
|
|
|
|
|
|
def _build_isochrones(center: Tuple[float, float], time_bands: List[int]) -> List[dict]:
|
|
"""
|
|
Build isochrones for the requested minute bands.
|
|
- If the routing provider supports `isochrones(center, minutes_list)`, use it once
|
|
(reduces ORS requests and rate-limit pressure).
|
|
- Otherwise, fall back to one call per band.
|
|
Output schema stays the same as before: a list of dicts with
|
|
{"minutes": int, "feature": Feature(Polygon), "area_km2": float}
|
|
"""
|
|
rp = get_routing_provider()
|
|
bands: List[int] = [int(m) for m in (time_bands or [])]
|
|
out: List[dict] = []
|
|
|
|
# Try a single batched call first
|
|
if hasattr(rp, "isochrones"):
|
|
try:
|
|
feats = rp.isochrones(center, bands) # expected same order as requested bands
|
|
n = min(len(bands), len(feats))
|
|
for m, feat in zip(bands[:n], feats[:n]):
|
|
area_km2 = _isochrone_area_km2(feat)
|
|
props = {"minutes": int(m), "area_km2": float(area_km2)}
|
|
f = {"type": "Feature", "geometry": feat.get("geometry"), "properties": props}
|
|
out.append({"minutes": int(m), "feature": f, "area_km2": float(area_km2)})
|
|
|
|
# If provider returned fewer features than requested, fill the rest via single calls
|
|
for m in bands[n:]:
|
|
feat = rp.isochrone(center, int(m))
|
|
area_km2 = _isochrone_area_km2(feat)
|
|
props = {"minutes": int(m), "area_km2": float(area_km2)}
|
|
f = {"type": "Feature", "geometry": feat.get("geometry"), "properties": props}
|
|
out.append({"minutes": int(m), "feature": f, "area_km2": float(area_km2)})
|
|
|
|
return out
|
|
except Exception:
|
|
# Fall back to per-band calls below if the batch call fails for any reason
|
|
pass
|
|
|
|
# Fallback: one request per band (original behavior)
|
|
for m in bands:
|
|
feat = rp.isochrone(center, int(m))
|
|
area_km2 = _isochrone_area_km2(feat)
|
|
props = {"minutes": int(m), "area_km2": float(area_km2)}
|
|
f = {"type": "Feature", "geometry": feat.get("geometry"), "properties": props}
|
|
out.append({"minutes": int(m), "feature": f, "area_km2": float(area_km2)})
|
|
|
|
return out
|
|
|
|
|
|
|
|
def _access_from_iso_list(iso_list: List[dict]) -> Tuple[float, List[str]]:
|
|
if not iso_list:
|
|
return 0.0, ["no_isochrones"]
|
|
areas = [item["area_km2"] for item in iso_list]
|
|
max_a = max(areas) if areas else 0.0
|
|
if max_a <= 0:
|
|
return 0.0, [f"{item['minutes']} min area ≈ 0.0 km²" for item in iso_list]
|
|
norms = [a / max_a for a in areas]
|
|
access = sum(norms) / len(norms)
|
|
reasons = [f"{item['minutes']} min area ≈ {item['area_km2']:.1f} km²" for item in iso_list]
|
|
return float(access), reasons
|
|
|
|
|
|
# --------------------------- Scores data-driven ---------------------------
|
|
|
|
def _competition_from_pois(city: str, business: str, iso_list: List[dict]) -> Tuple[float, List[str]]:
|
|
prov = get_provider()
|
|
try:
|
|
pois = prov.denue(city, business) # DataFrame[name,lat,lon,category]
|
|
except Exception as e:
|
|
return 0.5, [f"competition_fallback: provider_error={e}"]
|
|
|
|
if pois.empty or not iso_list:
|
|
return 0.5, ["competition_fallback: no_pois_or_isochrones"]
|
|
|
|
largest = max(iso_list, key=lambda x: x["minutes"])
|
|
poly = _polygon_from_feature(largest["feature"])
|
|
if poly is None:
|
|
return 0.5, ["competition_fallback: invalid_polygon"]
|
|
|
|
area_km2 = float(largest.get("area_km2") or 0.0)
|
|
if area_km2 <= 0.0:
|
|
return 0.5, ["competition_fallback: zero_area"]
|
|
|
|
cnt = 0
|
|
for row in pois.itertuples(index=False):
|
|
try:
|
|
p = Point(float(row.lon), float(row.lat))
|
|
if poly.contains(p):
|
|
cnt += 1
|
|
except Exception:
|
|
continue
|
|
|
|
density = cnt / area_km2 # POIs per km²
|
|
D_ref = float(os.getenv("COMP_REF_DENSITY", "5.0"))
|
|
comp = 1.0 / (1.0 + density / D_ref)
|
|
comp = float(max(0.0, min(1.0, comp)))
|
|
|
|
reasons = [
|
|
f"largest_band: {largest['minutes']} min, area ≈ {area_km2:.1f} km²",
|
|
f"competitors_inside: {cnt}, density ≈ {density:.2f} /km²",
|
|
f"competition_score = 1/(1 + density/{D_ref:.1f}) ≈ {comp:.2f}",
|
|
]
|
|
return comp, reasons
|
|
|
|
|
|
def _demand_from_popgrid(city: str, iso_list: List[dict]) -> Tuple[float, List[str]]:
|
|
prov = get_provider()
|
|
try:
|
|
grid = prov.popgrid(city) # DataFrame[cell_id, lat, lon, pop]
|
|
except Exception as e:
|
|
return 0.5, [f"demand_fallback: provider_error={e}"]
|
|
|
|
if grid.empty or not iso_list:
|
|
return 0.5, ["demand_fallback: no_grid_or_isochrones"]
|
|
|
|
largest = max(iso_list, key=lambda x: x["minutes"])
|
|
poly = _polygon_from_feature(largest["feature"])
|
|
if poly is None:
|
|
return 0.5, ["demand_fallback: invalid_polygon"]
|
|
|
|
area_km2 = float(largest.get("area_km2") or 0.0)
|
|
if area_km2 <= 0.0:
|
|
return 0.5, ["demand_fallback: zero_area"]
|
|
|
|
total_pop = 0.0
|
|
for row in grid.itertuples(index=False):
|
|
try:
|
|
p = Point(float(row.lon), float(row.lat))
|
|
if poly.contains(p):
|
|
total_pop += float(row.pop)
|
|
except Exception:
|
|
continue
|
|
|
|
density = total_pop / area_km2 if area_km2 > 0 else 0.0
|
|
P_ref = float(os.getenv("DEMAND_REF_POP", "50000"))
|
|
demand = total_pop / (total_pop + P_ref) if (total_pop + P_ref) > 0 else 0.0
|
|
demand = float(max(0.0, min(1.0, demand)))
|
|
|
|
reasons = [
|
|
f"largest_band: {largest['minutes']} min, area ≈ {area_km2:.1f} km²",
|
|
f"population_inside ≈ {int(total_pop)}, density ≈ {density:.1f} /km²",
|
|
f"demand_score = pop/(pop+{int(P_ref)}) ≈ {demand:.2f}",
|
|
]
|
|
return demand, reasons
|
|
|
|
|
|
# --------------------------- Sampling y Mapa principal ---------------------------
|
|
|
|
def _sample_points_in_polygon(poly: Polygon, n: int, rng: random.Random) -> List[Tuple[float, float]]:
|
|
minx, miny, maxx, maxy = poly.bounds
|
|
pts: List[Tuple[float, float]] = []
|
|
max_tries = n * 50
|
|
tries = 0
|
|
while len(pts) < n and tries < max_tries:
|
|
tries += 1
|
|
x = rng.uniform(minx, maxx)
|
|
y = rng.uniform(miny, maxy)
|
|
if poly.contains(Point(x, y)):
|
|
pts.append((y, x)) # (lat, lon)
|
|
return pts
|
|
|
|
|
|
def _km_per_deg_lon(lat_deg: float) -> float:
|
|
return 111.320 * math.cos(math.radians(lat_deg))
|
|
|
|
|
|
def _km_per_deg_lat() -> float:
|
|
return 110.574
|
|
|
|
|
|
def _save_sites_map(center: Tuple[float, float], iso_list_for_map: List[dict],
|
|
search_id: str, city: str, business: str,
|
|
top_candidates: List[Tuple[float, float, float]]) -> str | None:
|
|
try:
|
|
media_dir = settings.MEDIA_ROOT / "sites"
|
|
media_dir.mkdir(parents=True, exist_ok=True)
|
|
out_path = media_dir / f"sites_{search_id}.png"
|
|
|
|
# recolectar polígonos/extent
|
|
lons, lats = [center[1]], [center[0]]
|
|
polys = []
|
|
for item in iso_list_for_map:
|
|
feat = item["feature"]
|
|
geom = feat.get("geometry") or {}
|
|
if geom.get("type") != "Polygon":
|
|
continue
|
|
coords = geom.get("coordinates")[0]
|
|
poly_xy = [(float(x[0]), float(x[1])) for x in coords]
|
|
polys.append({"minutes": item["minutes"], "coords": poly_xy, "area": item["area_km2"]})
|
|
lons.extend([p[0] for p in poly_xy])
|
|
lats.extend([p[1] for p in poly_xy])
|
|
|
|
fig, ax = plt.subplots(figsize=(7.6, 7.6))
|
|
|
|
band_palette = ["#2E86AB", "#F18F01", "#C73E1D", "#6C5B7B", "#17B890", "#7E57C2"]
|
|
rank_palette = ["#1B998B", "#3A86FF", "#FB5607", "#FFBE0B", "#8338EC", "#FF006E"]
|
|
|
|
for i, item in enumerate(sorted(polys, key=lambda d: d["minutes"], reverse=True)):
|
|
poly = MplPolygon(item["coords"], closed=True,
|
|
facecolor=band_palette[i % len(band_palette)], alpha=0.25,
|
|
edgecolor=band_palette[i % len(band_palette)], linewidth=1.6,
|
|
label=f"{item['minutes']} min · {item['area']:.1f} km²")
|
|
ax.add_patch(poly)
|
|
|
|
ax.scatter([center[1]], [center[0]], s=68, zorder=6,
|
|
facecolor="#000", edgecolor="white", linewidth=1.2)
|
|
ax.annotate("center", (center[1], center[0]),
|
|
xytext=(center[1] + 0.01, center[0] + 0.01),
|
|
fontsize=9, color="#303030",
|
|
bbox=dict(boxstyle="round,pad=0.2", fc="white", ec="none", alpha=0.75),
|
|
arrowprops=dict(arrowstyle="-", lw=0.7, color="#666", alpha=0.9))
|
|
|
|
sizes = [90, 80, 72, 64, 56, 50, 46, 42, 38, 34]
|
|
legend_rows = []
|
|
for idx, (lat, lon, score) in enumerate(top_candidates, start=1):
|
|
color = rank_palette[(idx - 1) % len(rank_palette)]
|
|
size = sizes[idx - 1] if idx - 1 < len(sizes) else 30
|
|
ax.scatter([lon], [lat], s=size, zorder=7,
|
|
facecolor=color, edgecolor="white", linewidth=1.0)
|
|
ax.annotate(f"{idx} · {score:.2f}", (lon, lat),
|
|
xytext=(lon + 0.008, lat + 0.008),
|
|
fontsize=8, color="#111",
|
|
bbox=dict(boxstyle="round,pad=0.2", fc="white", ec="#bbb", alpha=0.9))
|
|
legend_rows.append(f"{idx}. ({score:.2f}) {lat:.4f}, {lon:.4f}")
|
|
lons.append(lon); lats.append(lat)
|
|
|
|
if lons and lats:
|
|
minx, maxx = min(lons), max(lons)
|
|
miny, maxy = min(lats), max(lats)
|
|
pad_x = max((maxx - minx) * 0.08, 0.01)
|
|
pad_y = max((maxy - miny) * 0.08, 0.01)
|
|
ax.set_xlim(minx - pad_x, maxx + pad_x)
|
|
ax.set_ylim(miny - pad_y, maxy + pad_y)
|
|
|
|
ax.set_title(f"Top sites — {business} @ {city}", fontsize=13, pad=10)
|
|
ax.set_xlabel("Longitude")
|
|
ax.set_ylabel("Latitude")
|
|
ax.grid(True, linestyle=":", linewidth=0.6, alpha=0.6)
|
|
for spine in ["top", "right"]:
|
|
ax.spines[spine].set_visible(False)
|
|
|
|
leg = ax.legend(loc="lower right", frameon=True, fontsize=8, title="Isochrones")
|
|
if leg and leg.get_frame():
|
|
leg.get_frame().set_alpha(0.9)
|
|
|
|
x0, x1 = ax.get_xlim()
|
|
y0, y1 = ax.get_ylim()
|
|
x_text = x0 + (x1 - x0) * 0.70
|
|
y_text = y0 + (y1 - y0) * 0.97
|
|
ax.text(x_text, y_text,
|
|
"Top-K (score)\n" + "\n".join(legend_rows),
|
|
ha="left", va="top", fontsize=8, color="#111",
|
|
bbox=dict(boxstyle="round,pad=0.3", fc="white", ec="#ccc", alpha=0.9))
|
|
|
|
km_per_deg_x = _km_per_deg_lon(center[0])
|
|
deg_len = 5.0 / km_per_deg_x if km_per_deg_x > 0 else 0.05
|
|
px = x0 + (x1 - x0) * 0.10
|
|
py = y0 + (y1 - y0) * 0.08
|
|
ax.plot([px, px + deg_len], [py, py], lw=3, color="#222")
|
|
ax.plot([px, px], [py - 0.001, py + 0.001], lw=2, color="#222")
|
|
ax.plot([px + deg_len, px + deg_len], [py - 0.001, py + 0.001], lw=2, color="#222")
|
|
ax.text(px + deg_len / 2.0, py + 0.002, "5 km",
|
|
ha="center", va="bottom", fontsize=8, color="#222",
|
|
bbox=dict(boxstyle="round,pad=0.2", fc="white", ec="none", alpha=0.7))
|
|
|
|
fig.tight_layout()
|
|
fig.savefig(out_path, dpi=150)
|
|
plt.close(fig)
|
|
|
|
return f"{settings.MEDIA_URL}sites/{out_path.name}"
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# --------------------------- Mapas densidad: Demanda / Competencia ---------------------------
|
|
|
|
def _grid_kde(xy: np.ndarray, weights: Optional[np.ndarray],
|
|
x_grid: np.ndarray, y_grid: np.ndarray, bw: Optional[float] = None) -> np.ndarray:
|
|
if xy.shape[1] != 2 or xy.shape[0] < 2:
|
|
return np.zeros((y_grid.size, x_grid.size), dtype=float)
|
|
kde = gaussian_kde(xy.T, weights=weights, bw_method=bw)
|
|
Xg, Yg = np.meshgrid(x_grid, y_grid)
|
|
pts = np.vstack([Xg.ravel(), Yg.ravel()])
|
|
z = kde(pts).reshape(Yg.shape)
|
|
z = z - z.min()
|
|
if z.max() > 0:
|
|
z = z / z.max()
|
|
return z
|
|
|
|
|
|
def _render_density_map(kind: str,
|
|
center: Tuple[float, float],
|
|
iso_list: List[dict],
|
|
points_xy: np.ndarray,
|
|
weights: Optional[np.ndarray],
|
|
search_id: str,
|
|
city: str,
|
|
business: str) -> Optional[str]:
|
|
try:
|
|
extent = _extent_from_iso_list(iso_list)
|
|
if extent is None:
|
|
cx, cy = center[1], center[0]
|
|
extent = (cx - 0.08, cy - 0.08, cx + 0.08, cy + 0.08)
|
|
minx, miny, maxx, maxy = extent
|
|
pad_x = max((maxx - minx) * 0.05, 0.01)
|
|
pad_y = max((maxy - miny) * 0.05, 0.01)
|
|
minx -= pad_x; maxx += pad_x
|
|
miny -= pad_y; maxy += pad_y
|
|
|
|
lat0 = max(miny, min(maxy, center[0]))
|
|
kx = _km_per_deg_lon(lat0)
|
|
ky = _km_per_deg_lat()
|
|
|
|
if points_xy.size == 0:
|
|
return None
|
|
xs = points_xy[:, 0] * kx
|
|
ys = points_xy[:, 1] * ky
|
|
|
|
grid_n = int(os.getenv("HEAT_GRID_N", "220"))
|
|
xg = np.linspace(minx * kx, maxx * kx, grid_n)
|
|
yg = np.linspace(miny * ky, maxy * ky, grid_n)
|
|
|
|
z = _grid_kde(np.c_[xs, ys], weights, xg, yg, bw=None)
|
|
|
|
media_dir = settings.MEDIA_ROOT / "sites"
|
|
media_dir.mkdir(parents=True, exist_ok=True)
|
|
out_path = media_dir / f"{kind}_{search_id}.png"
|
|
|
|
fig, ax = plt.subplots(figsize=(8.0, 7.0))
|
|
im = ax.imshow(z, origin="lower",
|
|
extent=(minx, maxx, miny, maxy),
|
|
interpolation="bilinear", alpha=0.85)
|
|
if kind == "demand":
|
|
im.set_cmap("YlOrRd")
|
|
title = f"Demand heat — {business} @ {city}"
|
|
else:
|
|
im.set_cmap("GnBu")
|
|
title = f"Competition heat — {business} @ {city}"
|
|
|
|
cs = ax.contour(z, levels=6, linewidths=0.8, alpha=0.8,
|
|
extent=(minx, maxx, miny, maxy), colors="k")
|
|
ax.clabel(cs, inline=True, fontsize=7, fmt="%.2f")
|
|
|
|
for item in sorted(iso_list, key=lambda d: d["minutes"], reverse=True):
|
|
feat = item.get("feature") or {}
|
|
geom = feat.get("geometry") or {}
|
|
if geom.get("type") != "Polygon":
|
|
continue
|
|
coords = geom.get("coordinates")[0]
|
|
ring = np.array([(float(x[0]), float(x[1])) for x in coords])
|
|
ax.plot(ring[:, 0], ring[:, 1], lw=1.2, alpha=0.9)
|
|
|
|
ax.scatter([center[1]], [center[0]], s=55, zorder=5,
|
|
facecolor="#000", edgecolor="white", linewidth=1.0)
|
|
|
|
ax.set_title(title, fontsize=13, pad=10)
|
|
ax.set_xlabel("Longitude")
|
|
ax.set_ylabel("Latitude")
|
|
ax.grid(True, linestyle=":", linewidth=0.5, alpha=0.5)
|
|
for spine in ["top", "right"]:
|
|
ax.spines[spine].set_visible(False)
|
|
|
|
cbar = plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
|
|
cbar.ax.set_ylabel("relative intensity", rotation=90, labelpad=8)
|
|
|
|
fig.tight_layout()
|
|
fig.savefig(out_path, dpi=150)
|
|
plt.close(fig)
|
|
|
|
return f"{settings.MEDIA_URL}sites/{out_path.name}"
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _render_demand_map(center: Tuple[float, float], iso_list: List[dict],
|
|
city: str, search_id: str, business: str) -> Optional[str]:
|
|
prov = get_provider()
|
|
try:
|
|
grid = prov.popgrid(city) # cell_id, lat, lon, pop
|
|
except Exception:
|
|
return None
|
|
if grid.empty:
|
|
return None
|
|
pts = grid[["lon", "lat", "pop"]].dropna().copy()
|
|
points_xy = pts[["lon", "lat"]].to_numpy(dtype=float)
|
|
weights = pts["pop"].to_numpy(dtype=float)
|
|
return _render_density_map("demand", center, iso_list, points_xy, weights, search_id, city, business)
|
|
|
|
|
|
def _render_competition_map(center: Tuple[float, float], iso_list: List[dict],
|
|
city: str, business: str, search_id: str) -> Optional[str]:
|
|
prov = get_provider()
|
|
try:
|
|
pois = prov.denue(city, business) # name, lat, lon, category
|
|
except Exception:
|
|
return None
|
|
if pois.empty:
|
|
return None
|
|
pts = pois[["lon", "lat"]].dropna().copy()
|
|
points_xy = pts.to_numpy(dtype=float)
|
|
return _render_density_map("competition", center, iso_list, points_xy, None, search_id, city, business)
|
|
|
|
|
|
# --------------------------- Artefacto GeoJSON por búsqueda ---------------------------
|
|
|
|
def _fc(features: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
return {"type": "FeatureCollection", "features": features}
|
|
|
|
def _candidates_fc(center: Tuple[float,float],
|
|
top: List[Tuple[float,float,float,ScoreBreakdown,List[str],List[dict]]]) -> Dict[str, Any]:
|
|
feats = []
|
|
for idx, (lat, lon, score, br, _reasons, _iso) in enumerate(top, start=1):
|
|
feats.append({
|
|
"type": "Feature",
|
|
"geometry": {"type": "Point", "coordinates": [float(lon), float(lat)]},
|
|
"properties": {
|
|
"rank": idx,
|
|
"score": float(score),
|
|
"access": float(br.access),
|
|
"demand": float(br.demand),
|
|
"competition": float(br.competition),
|
|
"is_center": abs(lat - center[0]) < 1e-9 and abs(lon - center[1]) < 1e-9,
|
|
}
|
|
})
|
|
return _fc(feats)
|
|
|
|
def _isochrones_fc(iso_list: List[dict]) -> Dict[str, Any]:
|
|
feats = []
|
|
for item in iso_list:
|
|
f = item["feature"]
|
|
# ya tiene properties {"minutes","area_km2"}
|
|
feats.append(f)
|
|
return _fc(feats)
|
|
|
|
def _pois_fc(pois_df, poly: Polygon) -> Dict[str, Any]:
|
|
feats = []
|
|
if pois_df is None or pois_df.empty:
|
|
return _fc(feats)
|
|
count = 0
|
|
for row in pois_df.itertuples(index=False):
|
|
try:
|
|
lon = float(row.lon); lat = float(row.lat)
|
|
if not poly.contains(Point(lon, lat)):
|
|
continue
|
|
feats.append({
|
|
"type": "Feature",
|
|
"geometry": {"type": "Point", "coordinates": [lon, lat]},
|
|
"properties": {
|
|
"name": getattr(row, "name", None),
|
|
"category": getattr(row, "category", None),
|
|
}
|
|
})
|
|
count += 1
|
|
if count >= int(os.getenv("MAX_POIS_GEOJSON", "1000")):
|
|
break
|
|
except Exception:
|
|
continue
|
|
return _fc(feats)
|
|
|
|
def _popgrid_fc(grid_df, poly: Polygon) -> Dict[str, Any]:
|
|
feats = []
|
|
if grid_df is None or grid_df.empty:
|
|
return _fc(feats)
|
|
# filtra dentro del polígono
|
|
inside = []
|
|
for row in grid_df.itertuples(index=False):
|
|
try:
|
|
lon = float(row.lon); lat = float(row.lat); pop = float(row.pop)
|
|
if poly.contains(Point(lon, lat)):
|
|
inside.append((lon, lat, pop))
|
|
except Exception:
|
|
continue
|
|
if not inside:
|
|
return _fc(feats)
|
|
# ordena por población desc y limita
|
|
inside.sort(key=lambda t: t[2], reverse=True)
|
|
cap = int(os.getenv("MAX_POPGRID_GEOJSON", "800"))
|
|
inside = inside[:cap]
|
|
for lon, lat, pop in inside:
|
|
feats.append({
|
|
"type": "Feature",
|
|
"geometry": {"type": "Point", "coordinates": [lon, lat]},
|
|
"properties": {"pop": pop}
|
|
})
|
|
return _fc(feats)
|
|
|
|
def _save_run_artifact(search_id: str,
|
|
req: SiteSearchRequest,
|
|
chosen_center: Tuple[float,float],
|
|
top: List[Tuple[float,float,float,ScoreBreakdown,List[str],List[dict]]],
|
|
iso_list: List[dict]) -> Optional[str]:
|
|
"""
|
|
Guarda un JSON con:
|
|
- request_summary
|
|
- candidates_fc
|
|
- isochrones_fc
|
|
- pois_competition_fc
|
|
- popgrid_fc (muestra)
|
|
"""
|
|
try:
|
|
media_dir = settings.MEDIA_ROOT / "sites"
|
|
media_dir.mkdir(parents=True, exist_ok=True)
|
|
out_path = media_dir / f"run_{search_id}.json"
|
|
|
|
# polígono mayor para recortes
|
|
largest = max(iso_list, key=lambda x: x["minutes"]) if iso_list else None
|
|
poly = _polygon_from_feature(largest["feature"]) if largest else None
|
|
|
|
prov = get_provider()
|
|
try:
|
|
pois = prov.denue(req.city, req.business)
|
|
except Exception:
|
|
pois = None
|
|
try:
|
|
grid = prov.popgrid(req.city)
|
|
except Exception:
|
|
grid = None
|
|
|
|
artifact = {
|
|
"version": "sites-artifact-1",
|
|
"created_at": datetime.utcnow().isoformat() + "Z",
|
|
"request": req.model_dump(),
|
|
"center": {"lat": chosen_center[0], "lon": chosen_center[1]},
|
|
"candidates_fc": _candidates_fc(chosen_center, top),
|
|
"isochrones_fc": _isochrones_fc(iso_list),
|
|
"pois_competition_fc": _pois_fc(pois, poly) if poly is not None else _fc([]),
|
|
"popgrid_fc": _popgrid_fc(grid, poly) if poly is not None else _fc([]),
|
|
}
|
|
with open(out_path, "w", encoding="utf-8") as f:
|
|
json.dump(artifact, f, ensure_ascii=False)
|
|
return str(out_path)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
# --------------------------- Estimador principal ---------------------------
|
|
|
|
def run_site_search(req: SiteSearchRequest) -> SiteSearchResponse:
|
|
search_id = str(uuid.uuid4())
|
|
warnings: List[str] = []
|
|
candidates: List[CandidateSite] = []
|
|
map_url: str | None = None
|
|
demand_map_url: Optional[str] = None
|
|
competition_map_url: Optional[str] = None
|
|
|
|
w_access = float(os.getenv("WEIGHT_ACCESS", "0.35"))
|
|
w_demand = float(os.getenv("WEIGHT_DEMAND", "0.40"))
|
|
w_comp = float(os.getenv("WEIGHT_COMP", "0.25"))
|
|
|
|
if req.center:
|
|
center = (float(req.center[0]), float(req.center[1]))
|
|
base_iso = _build_isochrones(center, req.time_bands or [])
|
|
largest = max(base_iso, key=lambda x: x["minutes"]) if base_iso else None
|
|
poly = _polygon_from_feature(largest["feature"]) if largest else None
|
|
|
|
if poly is None:
|
|
access, access_r = _access_from_iso_list(base_iso)
|
|
comp, comp_r = _competition_from_pois(req.city, req.business, base_iso)
|
|
dem, dem_r = _demand_from_popgrid(req.city, base_iso)
|
|
score = w_access * access + w_demand * dem + w_comp * comp
|
|
score = float(max(0.0, min(1.0, score)))
|
|
breakdown = ScoreBreakdown(demand=dem, competition=comp, access=access)
|
|
reasons = (["Access from isochrone areas (normalized avg)"] + access_r +
|
|
["Competition from POI density (largest band)"] + comp_r +
|
|
["Demand from population grid (largest band)"] + dem_r)
|
|
candidates.append(CandidateSite(lat=center[0], lon=center[1], score=score,
|
|
breakdown=breakdown, reasons=reasons))
|
|
map_url = _save_sites_map(center, base_iso, search_id, req.city, req.business,
|
|
[(center[0], center[1], score)])
|
|
warnings.append("sampling_fallback_invalid_polygon")
|
|
demand_map_url = _render_demand_map(center, base_iso, req.city, search_id, req.business)
|
|
competition_map_url = _render_competition_map(center, base_iso, req.city, req.business, search_id)
|
|
# artefacto (solo center)
|
|
_save_run_artifact(
|
|
search_id, req, center,
|
|
[(center[0], center[1], score, breakdown, reasons, base_iso)],
|
|
base_iso
|
|
)
|
|
else:
|
|
rng = random.Random(int(search_id.replace("-", ""), 16) & 0xFFFFFFFF)
|
|
samples = _sample_points_in_polygon(poly, int(req.num_samples), rng)
|
|
cand_points: List[Tuple[float, float]] = [center] + samples
|
|
|
|
scored: List[Tuple[float, float, float, ScoreBreakdown, List[str], List[dict]]] = []
|
|
for (lat, lon) in cand_points:
|
|
iso_list = _build_isochrones((lat, lon), req.time_bands or [])
|
|
access, access_r = _access_from_iso_list(iso_list)
|
|
comp, comp_r = _competition_from_pois(req.city, req.business, iso_list)
|
|
dem, dem_r = _demand_from_popgrid(req.city, iso_list)
|
|
score = w_access * access + w_demand * dem + w_comp * comp
|
|
score = float(max(0.0, min(1.0, score)))
|
|
breakdown = ScoreBreakdown(demand=dem, competition=comp, access=access)
|
|
reasons = (["Access from isochrone areas (normalized avg)"] + access_r +
|
|
["Competition from POI density (largest band)"] + comp_r +
|
|
["Demand from population grid (largest band)"] + dem_r)
|
|
scored.append((lat, lon, score, breakdown, reasons, iso_list))
|
|
|
|
scored.sort(key=lambda t: t[2], reverse=True)
|
|
top = scored[: max(1, int(req.max_candidates))]
|
|
|
|
for (lat, lon, score, breakdown, reasons, _iso) in top:
|
|
candidates.append(CandidateSite(
|
|
lat=lat, lon=lon, score=score, breakdown=breakdown, reasons=reasons
|
|
))
|
|
|
|
top1_iso = top[0][5]
|
|
top_points = [(lat, lon, score) for (lat, lon, score, *_rest) in top]
|
|
map_url = _save_sites_map((top[0][0], top[0][1]), top1_iso, search_id,
|
|
req.city, req.business, top_points)
|
|
warnings.append("multi_candidate_sampling_ok")
|
|
|
|
demand_map_url = _render_demand_map((top[0][0], top[0][1]), top1_iso, req.city, search_id, req.business)
|
|
competition_map_url = _render_competition_map((top[0][0], top[0][1]), top1_iso, req.city, req.business, search_id)
|
|
|
|
if demand_map_url: warnings.append("demand_map_saved")
|
|
else: warnings.append("demand_map_failed")
|
|
if competition_map_url: warnings.append("competition_map_saved")
|
|
else: warnings.append("competition_map_failed")
|
|
|
|
# artefacto (Top-K + isócronas del Top-1)
|
|
_save_run_artifact(search_id, req, (top[0][0], top[0][1]), top, top1_iso)
|
|
else:
|
|
neutral = ScoreBreakdown(demand=0.5, competition=0.5, access=0.5)
|
|
for i in range(req.max_candidates):
|
|
candidates.append(CandidateSite(
|
|
lat=0.0, lon=0.0, score=0.5,
|
|
breakdown=neutral,
|
|
reasons=[f"stub candidate #{i+1} for {req.business} in {req.city}"],
|
|
))
|
|
warnings.append("no_center_provided_stub_output")
|
|
|
|
return SiteSearchResponse(
|
|
search_id=search_id,
|
|
city=req.city,
|
|
business=req.business,
|
|
time_bands=req.time_bands,
|
|
candidates=candidates,
|
|
map_url=map_url,
|
|
demand_map_url=demand_map_url,
|
|
competition_map_url=competition_map_url,
|
|
data_release=req.data_release,
|
|
warnings=warnings,
|
|
)
|