# pxy_bots/router.py import json import logging from typing import Dict, Optional, Tuple from urllib.parse import urlparse logger = logging.getLogger(__name__) # --- allowlist of outbound hosts (adjust as needed) --- ALLOWED_FORWARD_HOSTS = {"127.0.0.1", "localhost", "app.polisplexity.tech"} # --- minimal route map (in-memory) --- # Per-bot mapping. Keys inside are command names WITHOUT leading slash. # Special keys: # "_default" → used when no command name detected (plain text) # "_callback" → used for callback_query events ROUTE_MAP: Dict[str, Dict[str, str]] = { # Example: use the local echo endpoint to validate the full loop "PolisplexityBot": { "_default": "http://127.0.0.1:8000/api/bots/echo_render", "_callback": "http://127.0.0.1:8000/api/bots/echo_render", "report_trash": "http://127.0.0.1:8000/api/bots/echo_render", # add more commands here… }, # Wildcard bot (applies to any) — optional: "*": { "_default": "http://127.0.0.1:8000/api/bots/echo_render", "_callback": "http://127.0.0.1:8000/api/bots/echo_render", }, } # Try to use requests; fallback to urllib try: import requests # type: ignore _HAS_REQUESTS = True except Exception: import urllib.request # type: ignore _HAS_REQUESTS = False def _allowed(url: str) -> Tuple[bool, Optional[str]]: try: p = urlparse(url) host = (p.hostname or "").lower() if p.scheme not in {"http", "https"}: return False, "bad_scheme" if host not in ALLOWED_FORWARD_HOSTS: return False, f"host_not_allowed:{host}" return True, None except Exception as e: return False, f"invalid_url:{e}" def pick_url(bot_name: str, canon: Dict) -> Optional[str]: """Decide target URL from bot + command/trigger.""" bot_routes = ROUTE_MAP.get(bot_name) or ROUTE_MAP.get("*") or {} trigger = ((canon.get("command") or {}).get("trigger")) or "message" cmd = ((canon.get("command") or {}).get("name")) or "" if trigger == "callback": return bot_routes.get("_callback") or bot_routes.get("_default") if cmd: return bot_routes.get(cmd) or bot_routes.get("_default") return bot_routes.get("_default") def post_json(url: str, payload: Dict, timeout: float = 4.0) -> Tuple[int, Dict]: """Blocking POST JSON; never raises; returns (status, body_json_or_wrapper).""" ok, why = _allowed(url) if not ok: logger.warning("router.reject url=%s reason=%s", url, why) return 400, {"ok": False, "error": f"forward_rejected:{why}", "url": url} data = json.dumps(payload, ensure_ascii=False).encode("utf-8") headers = {"Content-Type": "application/json"} if _HAS_REQUESTS: try: r = requests.post(url, data=data, headers=headers, timeout=timeout) try: body = r.json() except Exception: body = {"text": r.text[:2000]} return r.status_code, body except Exception as e: logger.exception("router.requests_failed url=%s", url) return 502, {"ok": False, "error": f"requests_failed:{e.__class__.__name__}"} else: try: req = urllib.request.Request(url, data=data, headers=headers, method="POST") with urllib.request.urlopen(req, timeout=timeout) as resp: # nosec raw = resp.read(65536) try: body = json.loads(raw.decode("utf-8")) except Exception: body = {"text": raw.decode("utf-8", errors="replace")[:2000]} return getattr(resp, "status", 200), body except Exception as e: logger.exception("router.urllib_failed url=%s", url) return 502, {"ok": False, "error": f"urllib_failed:{e.__class__.__name__}"}