258 lines
8.9 KiB
Python

from __future__ import annotations #
import json
import re
import requests
from typing import Any, Dict, Tuple, Optional, List
from django.conf import settings
from django.http import JsonResponse, HttpRequest
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
# ---------- helpers ----------
def _base(request: HttpRequest) -> str:
return request.build_absolute_uri("/")[:-1]
def _load_body(request: HttpRequest) -> Dict[str, Any]:
try:
raw = (request.body or b"").decode("utf-8")
return json.loads(raw or "{}")
except Exception:
return {}
_SITES_DEFAULTS = {
"city": "CDMX",
"business": "all",
"time_bands": [10, 20],
"center_by_city": {
"CDMX": (19.4326, -99.1332),
},
}
def _parse_time_bands(raw: str) -> list[int]:
bands: list[int] = []
for part in re.split(r"[,\s]+", (raw or "").strip()):
if not part:
continue
try:
val = int(part)
except Exception:
continue
if val > 0:
bands.append(val)
return bands
def _parse_latlon(raw: str) -> Optional[List[float]]:
parts = [p.strip() for p in (raw or "").split(",")]
if len(parts) != 2:
return None
try:
lat = float(parts[0])
lon = float(parts[1])
except Exception:
return None
return [lat, lon]
def _parse_sites_shorthand(body: Dict[str, Any]) -> Tuple[Dict[str, Any], str]:
args_raw = (body.get("input", {}) or {}).get("args_raw") or ""
cleaned = re.sub(r"^/\w+\s*", "", args_raw).strip()
if not cleaned:
return {}, "empty"
payload: Dict[str, Any] = {}
time_bands: list[int] = []
lat_val = None
lon_val = None
for tok in cleaned.split():
key = None
val = None
if "=" in tok:
key, val = tok.split("=", 1)
elif ":" in tok:
key, val = tok.split(":", 1)
if key is not None:
key = key.strip().lower()
val = (val or "").strip()
if key in {"city", "c"}:
payload["city"] = val
elif key in {"business", "biz", "b", "category", "cat"}:
payload["business"] = val
elif key in {"time", "times", "band", "bands", "time_bands", "tb"}:
bands = _parse_time_bands(val)
if bands:
time_bands.extend(bands)
elif key in {"max", "max_candidates", "k", "top"}:
try:
payload["max_candidates"] = max(1, int(val))
except Exception:
pass
elif key in {"center", "ctr"}:
latlon = _parse_latlon(val)
if latlon:
payload["center"] = latlon
elif key in {"lat", "latitude"}:
try:
lat_val = float(val)
except Exception:
pass
elif key in {"lon", "lng", "longitude"}:
try:
lon_val = float(val)
except Exception:
pass
continue
if re.fullmatch(r"\d+(?:,\d+)*", tok):
bands = _parse_time_bands(tok)
if bands:
time_bands.extend(bands)
continue
if tok.isalpha() and tok.isupper() and "city" not in payload:
payload["city"] = tok
continue
if "business" not in payload:
payload["business"] = tok
else:
payload["business"] = f"{payload['business']}_{tok}"
if time_bands:
payload["time_bands"] = time_bands
if lat_val is not None and lon_val is not None and "center" not in payload:
payload["center"] = [lat_val, lon_val]
return payload, "shorthand"
def _apply_sites_defaults(payload: Dict[str, Any], body: Dict[str, Any]) -> Dict[str, Any]:
out = dict(payload or {})
loc = (body.get("input") or {}).get("location") or {}
if "center" not in out and loc.get("lat") is not None and loc.get("lon") is not None:
out["center"] = [loc.get("lat"), loc.get("lon")]
city = (out.get("city") or _SITES_DEFAULTS["city"]).strip()
out["city"] = city
if not out.get("business"):
out["business"] = _SITES_DEFAULTS["business"]
if not out.get("time_bands"):
out["time_bands"] = list(_SITES_DEFAULTS["time_bands"])
if "center" not in out:
fallback = _SITES_DEFAULTS["center_by_city"].get(city.upper())
if fallback:
out["center"] = [fallback[0], fallback[1]]
return out
def _extract_payload(body: Dict[str, Any]) -> Tuple[Dict[str, Any], str]:
"""
Returns (payload, src) where src is 'payload', 'args_raw', or 'empty'.
Accepts:
1) {"payload": {...}}
2) full canonical envelope with .input.args_raw="/sami {...}"
"""
# 1) direct payload
if isinstance(body.get("payload"), dict):
return body["payload"], "payload"
# 2) canonical envelope: parse JSON after the command in args_raw
args_raw = (body.get("input", {}) or {}).get("args_raw") or ""
# strip leading "/sami " or "/sites "
cleaned = re.sub(r"^/\w+\s*", "", args_raw).strip()
if cleaned:
try:
return json.loads(cleaned), "args_raw"
except Exception:
pass
return {}, "empty"
def _resolve_internal_base(request: HttpRequest) -> str:
base = (getattr(settings, "AGENTS_INTERNAL_BASE", "") or "").strip()
if base:
return base.rstrip("/")
host = (request.get_host() or "").lower()
if host.endswith(":8011"):
return "http://127.0.0.1:8000"
if host.endswith(":8010"):
return "http://127.0.0.1:8002"
return "http://127.0.0.1:8002"
def _post_execute(request: HttpRequest, agent: str, payload: Dict[str, Any], timeout: float = 30.0):
url = f"{_resolve_internal_base(request)}/api/agents/execute"
try:
r = requests.post(url, json={"agent": agent, "payload": payload}, timeout=timeout)
# try parse json regardless of status
try:
data = r.json()
except Exception:
data = {"code": "NON_JSON", "message": r.text[:2000]}
return r.status_code, data
except requests.Timeout:
return 504, {"code": "UPSTREAM_TIMEOUT", "message": "agent upstream timed out"}
except Exception as e:
return 500, {"code": "EXEC_ERROR", "message": str(e)}
# ---------- text builders ----------
def _text_sami(data: Dict[str, Any]) -> str:
if "beta" in data and "r2" in data:
lines = [f"SAMI run: β={data['beta']:.3f}, R²={data['r2']:.3f}"]
resid = data.get("residuals") or []
top = sorted(resid, key=lambda x: x.get("rank", 1e9))[:3]
for c in top:
lines.append(f"{c.get('rank')}. {c.get('city')}: {c.get('sami',0):+0.2f}")
if data.get("share_url"):
lines += ["", data["share_url"]]
return "\n".join(lines)
if data.get("code"):
return f"⚠️ {data.get('code')}: {data.get('message','')}"
return "SAMI results ready."
def _text_sites(data: Dict[str, Any]) -> str:
if isinstance(data.get("candidates"), list):
city = data.get("city", "?")
business = data.get("business", "?")
lines = [f"Top sites for {business} in {city}:"]
for i, c in enumerate(data["candidates"][:3], 1):
lat = c.get("lat", 0); lon = c.get("lon", 0); sc = c.get("score", 0)
lines.append(f"{i}. score={sc:.2f} @ ({lat:.5f},{lon:.5f})")
for k in ("share_url", "isochrones_geojson_url", "candidates_geojson_url"):
if data.get(k): lines.append(data[k])
return "\n".join(lines)
if data.get("code"):
return f"⚠️ {data.get('code')}: {data.get('message','')}"
return "Site scoring ready."
# ---------- views ----------
@csrf_exempt
@require_http_methods(["GET", "POST"])
def format_sami(request: HttpRequest):
body = _load_body(request)
payload, src = _extract_payload(body)
status, data = _post_execute(request, "sami", payload, timeout=30.0)
# add echo + text
data = data if isinstance(data, dict) else {"result": data}
data.setdefault("_echo", {"src": src, "payload_keys": list(payload.keys())})
try:
data["text"] = _text_sami(data)
except Exception:
pass
return JsonResponse(data, status=status, safe=False)
@csrf_exempt
@require_http_methods(["GET", "POST"])
def format_sites(request: HttpRequest):
body = _load_body(request)
payload, src = _extract_payload(body)
if not payload and src == "empty":
payload, src = _parse_sites_shorthand(body)
payload = _apply_sites_defaults(payload, body)
status, data = _post_execute(request, "sites", payload, timeout=30.0)
data = data if isinstance(data, dict) else {"result": data}
data.setdefault("_echo", {"src": src, "payload_keys": list(payload.keys())})
try:
data["text"] = _text_sites(data)
except Exception:
pass
return JsonResponse(data, status=status, safe=False)