Ekaropolus 4d788c1acc
All checks were successful
continuous-integration/drone/push Build is passing
Better format for site agent response to telegram bots
2026-01-03 19:56:05 -06:00

291 lines
10 KiB
Python

from __future__ import annotations #
import json
import re
import requests
from typing import Any, Dict, Tuple, Optional, List
from urllib.parse import urlparse
from django.conf import settings
from django.http import JsonResponse, HttpRequest
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from core.urlbuild import public_base, public_url
# ---------- helpers ----------
def _base(request: HttpRequest) -> str:
return request.build_absolute_uri("/")[:-1]
def _load_body(request: HttpRequest) -> Dict[str, Any]:
try:
raw = (request.body or b"").decode("utf-8")
return json.loads(raw or "{}")
except Exception:
return {}
_SITES_DEFAULTS = {
"city": "CDMX",
"business": "all",
"time_bands": [10, 20],
"center_by_city": {
"CDMX": (19.4326, -99.1332),
},
}
def _parse_time_bands(raw: str) -> list[int]:
bands: list[int] = []
for part in re.split(r"[,\s]+", (raw or "").strip()):
if not part:
continue
try:
val = int(part)
except Exception:
continue
if val > 0:
bands.append(val)
return bands
def _parse_latlon(raw: str) -> Optional[List[float]]:
parts = [p.strip() for p in (raw or "").split(",")]
if len(parts) != 2:
return None
try:
lat = float(parts[0])
lon = float(parts[1])
except Exception:
return None
return [lat, lon]
def _parse_sites_shorthand(body: Dict[str, Any]) -> Tuple[Dict[str, Any], str]:
args_raw = (body.get("input", {}) or {}).get("args_raw") or ""
cleaned = re.sub(r"^/\w+\s*", "", args_raw).strip()
if not cleaned:
return {}, "empty"
payload: Dict[str, Any] = {}
time_bands: list[int] = []
lat_val = None
lon_val = None
for tok in cleaned.split():
key = None
val = None
if "=" in tok:
key, val = tok.split("=", 1)
elif ":" in tok:
key, val = tok.split(":", 1)
if key is not None:
key = key.strip().lower()
val = (val or "").strip()
if key in {"city", "c"}:
payload["city"] = val
elif key in {"business", "biz", "b", "category", "cat"}:
payload["business"] = val
elif key in {"time", "times", "band", "bands", "time_bands", "tb"}:
bands = _parse_time_bands(val)
if bands:
time_bands.extend(bands)
elif key in {"max", "max_candidates", "k", "top"}:
try:
payload["max_candidates"] = max(1, int(val))
except Exception:
pass
elif key in {"center", "ctr"}:
latlon = _parse_latlon(val)
if latlon:
payload["center"] = latlon
elif key in {"lat", "latitude"}:
try:
lat_val = float(val)
except Exception:
pass
elif key in {"lon", "lng", "longitude"}:
try:
lon_val = float(val)
except Exception:
pass
continue
if re.fullmatch(r"\d+(?:,\d+)*", tok):
bands = _parse_time_bands(tok)
if bands:
time_bands.extend(bands)
continue
if tok.isalpha() and tok.isupper() and "city" not in payload:
payload["city"] = tok
continue
if "business" not in payload:
payload["business"] = tok
else:
payload["business"] = f"{payload['business']}_{tok}"
if time_bands:
payload["time_bands"] = time_bands
if lat_val is not None and lon_val is not None and "center" not in payload:
payload["center"] = [lat_val, lon_val]
return payload, "shorthand"
def _normalize_urls_to_public(data: Dict[str, Any], request: HttpRequest) -> None:
if not isinstance(data, dict):
return
base = public_base(request)
url_keys = {
"share_url", "map_url", "demand_map_url", "competition_map_url",
"main_download_url", "demand_download_url", "competition_download_url",
"main_preview_url", "demand_preview_url", "competition_preview_url",
"isochrones_geojson_url", "candidates_geojson_url",
"pois_competition_geojson_url", "popgrid_geojson_url",
"chart_url",
}
for k in url_keys:
v = data.get(k)
if not isinstance(v, str) or not v:
continue
if v.startswith("/"):
data[k] = public_url(v, request)
continue
try:
p = urlparse(v)
except Exception:
continue
if p.scheme in ("http", "https") and (p.hostname in {"127.0.0.1", "localhost"}):
q = f"?{p.query}" if p.query else ""
frag = f"#{p.fragment}" if p.fragment else ""
data[k] = f"{base}{p.path}{q}{frag}"
def _apply_sites_defaults(payload: Dict[str, Any], body: Dict[str, Any]) -> Dict[str, Any]:
out = dict(payload or {})
loc = (body.get("input") or {}).get("location") or {}
if "center" not in out and loc.get("lat") is not None and loc.get("lon") is not None:
out["center"] = [loc.get("lat"), loc.get("lon")]
city = (out.get("city") or _SITES_DEFAULTS["city"]).strip()
out["city"] = city
if not out.get("business"):
out["business"] = _SITES_DEFAULTS["business"]
if not out.get("time_bands"):
out["time_bands"] = list(_SITES_DEFAULTS["time_bands"])
if "center" not in out:
fallback = _SITES_DEFAULTS["center_by_city"].get(city.upper())
if fallback:
out["center"] = [fallback[0], fallback[1]]
return out
def _extract_payload(body: Dict[str, Any]) -> Tuple[Dict[str, Any], str]:
"""
Returns (payload, src) where src is 'payload', 'args_raw', or 'empty'.
Accepts:
1) {"payload": {...}}
2) full canonical envelope with .input.args_raw="/sami {...}"
"""
# 1) direct payload
if isinstance(body.get("payload"), dict):
return body["payload"], "payload"
# 2) canonical envelope: parse JSON after the command in args_raw
args_raw = (body.get("input", {}) or {}).get("args_raw") or ""
# strip leading "/sami " or "/sites "
cleaned = re.sub(r"^/\w+\s*", "", args_raw).strip()
if cleaned:
try:
return json.loads(cleaned), "args_raw"
except Exception:
pass
return {}, "empty"
def _resolve_internal_base(request: HttpRequest) -> str:
base = (getattr(settings, "AGENTS_INTERNAL_BASE", "") or "").strip()
if base:
return base.rstrip("/")
host = (request.get_host() or "").lower()
if host.endswith(":8011"):
return "http://127.0.0.1:8000"
if host.endswith(":8010"):
return "http://127.0.0.1:8002"
return "http://127.0.0.1:8002"
def _post_execute(request: HttpRequest, agent: str, payload: Dict[str, Any], timeout: float = 30.0):
url = f"{_resolve_internal_base(request)}/api/agents/execute"
try:
r = requests.post(url, json={"agent": agent, "payload": payload}, timeout=timeout)
# try parse json regardless of status
try:
data = r.json()
except Exception:
data = {"code": "NON_JSON", "message": r.text[:2000]}
return r.status_code, data
except requests.Timeout:
return 504, {"code": "UPSTREAM_TIMEOUT", "message": "agent upstream timed out"}
except Exception as e:
return 500, {"code": "EXEC_ERROR", "message": str(e)}
# ---------- text builders ----------
def _text_sami(data: Dict[str, Any]) -> str:
if "beta" in data and "r2" in data:
lines = [f"SAMI run: β={data['beta']:.3f}, R²={data['r2']:.3f}"]
resid = data.get("residuals") or []
top = sorted(resid, key=lambda x: x.get("rank", 1e9))[:3]
for c in top:
lines.append(f"{c.get('rank')}. {c.get('city')}: {c.get('sami',0):+0.2f}")
if data.get("share_url"):
lines += ["", data["share_url"]]
return "\n".join(lines)
if data.get("code"):
return f"⚠️ {data.get('code')}: {data.get('message','')}"
return "SAMI results ready."
def _text_sites(data: Dict[str, Any]) -> str:
if isinstance(data.get("candidates"), list):
city = data.get("city", "?")
business = data.get("business", "?")
lines = [f"Top sites for {business} in {city}:"]
for i, c in enumerate(data["candidates"][:3], 1):
lat = c.get("lat", 0); lon = c.get("lon", 0); sc = c.get("score", 0)
lines.append(f"{i}. score={sc:.2f} @ ({lat:.5f},{lon:.5f})")
if data.get("share_url"):
lines.append("")
lines.append(data["share_url"])
return "\n".join(lines)
if data.get("code"):
return f"⚠️ {data.get('code')}: {data.get('message','')}"
return "Site scoring ready."
# ---------- views ----------
@csrf_exempt
@require_http_methods(["GET", "POST"])
def format_sami(request: HttpRequest):
body = _load_body(request)
payload, src = _extract_payload(body)
status, data = _post_execute(request, "sami", payload, timeout=30.0)
# add echo + text
data = data if isinstance(data, dict) else {"result": data}
_normalize_urls_to_public(data, request)
data.setdefault("_echo", {"src": src, "payload_keys": list(payload.keys())})
try:
data["text"] = _text_sami(data)
except Exception:
pass
return JsonResponse(data, status=status, safe=False)
@csrf_exempt
@require_http_methods(["GET", "POST"])
def format_sites(request: HttpRequest):
body = _load_body(request)
payload, src = _extract_payload(body)
if not payload and src == "empty":
payload, src = _parse_sites_shorthand(body)
payload = _apply_sites_defaults(payload, body)
status, data = _post_execute(request, "sites", payload, timeout=30.0)
data = data if isinstance(data, dict) else {"result": data}
_normalize_urls_to_public(data, request)
data.setdefault("_echo", {"src": src, "payload_keys": list(payload.keys())})
try:
data["text"] = _text_sites(data)
except Exception:
pass
return JsonResponse(data, status=status, safe=False)