Ekaropolus 0eb2b393f2
All checks were successful
continuous-integration/drone/push Build is passing
SAMI Functionality add
2025-09-16 16:18:45 -06:00

205 lines
8.6 KiB
Python

# pxy_routing/services/ors_provider.py
from __future__ import annotations
from typing import Any, Dict, Iterable, List, Tuple
import math, os, time, random, requests
# Optional graceful fallback
try:
from .crowfly_provider import CrowFlyRoutingProvider
except Exception:
CrowFlyRoutingProvider = None # fallback disabled if not available
LatLon = Tuple[float, float] # (lat, lon)
class ORSRoutingProvider:
"""
ORS isochrones with retries/backoff and optional crow-fly fallback.
Env:
ORS_BASE_URL e.g., https://api.openrouteservice.org
ORS_API_KEY key (omit/blank for self-host)
ORS_PROFILE driving-car | cycling-regular | foot-walking (default: driving-car)
ORS_TIMEOUT_S request timeout seconds (default: 5)
ORS_GENERALIZE generalization in meters for polygons (optional, e.g., 20)
ORS_MAX_RANGE safety cap in minutes (optional; e.g., 45)
# New hardening knobs:
ORS_RETRY number of retries on 429/5xx (default: 2)
ORS_BACKOFF_BASE_S base seconds for exponential backoff (default: 0.8)
ORS_FALLBACK set to "crowfly" to degrade gracefully on errors
"""
def __init__(
self,
base_url: str | None = None,
api_key: str | None = None,
profile: str = "driving-car",
timeout_s: int = 5,
generalize: int | None = None,
max_range_min: int | None = None,
):
self.base_url = (base_url or os.getenv("ORS_BASE_URL") or "").rstrip("/")
self.api_key = api_key if api_key is not None else os.getenv("ORS_API_KEY", "")
self.profile = os.getenv("ORS_PROFILE", profile)
self.timeout_s = int(os.getenv("ORS_TIMEOUT_S", str(timeout_s)))
gen = os.getenv("ORS_GENERALIZE")
self.generalize = int(gen) if (gen and gen.isdigit()) else generalize
mr = os.getenv("ORS_MAX_RANGE")
self.max_range_min = int(mr) if (mr and mr.isdigit()) else max_range_min
# Hardening knobs
self.retries = int(os.getenv("ORS_RETRY", "2"))
self.backoff_base = float(os.getenv("ORS_BACKOFF_BASE_S", "0.8"))
self.fallback_mode = (os.getenv("ORS_FALLBACK") or "").strip().lower()
self._fallback = None
if self.fallback_mode == "crowfly" and CrowFlyRoutingProvider:
self._fallback = CrowFlyRoutingProvider()
if not self.base_url:
raise ValueError("ORS_BASE_URL is required for ORSRoutingProvider")
self._iso_url = f"{self.base_url}/v2/isochrones/{self.profile}"
self._headers = {
"Content-Type": "application/json; charset=utf-8",
"Accept": "application/json, application/geo+json",
}
if self.api_key:
self._headers["Authorization"] = self.api_key
# ---------- internals ----------
def _post(self, url: str, payload: Dict[str, Any]) -> requests.Response:
attempts = 1 + max(0, self.retries)
r = None
for i in range(attempts):
r = requests.post(url, json=payload, headers=self._headers, timeout=self.timeout_s)
if r.status_code in (429, 502, 503, 504) and i < attempts - 1:
delay = self.backoff_base * (2 ** i) * (0.75 + 0.5 * random.random())
time.sleep(delay)
continue
return r
return r # type: ignore
# ---------- public API ----------
def health(self) -> Dict[str, Any]:
try:
lat, lon = 19.4326, -99.1332
payload = {"locations": [[lon, lat]], "range": [60]}
if self.generalize:
payload["generalize"] = self.generalize
r = self._post(self._iso_url, payload)
ok = (r.status_code == 200)
return {"provider": "ors", "ok": ok, "profile": self.profile,
"base_url": self.base_url, "reason": None if ok else f"http {r.status_code}"}
except Exception as e:
return {"provider": "ors", "ok": False, "profile": self.profile,
"base_url": self.base_url, "reason": f"{type(e).__name__}: {e}"}
def isochrone(self, center: LatLon, minutes: int) -> Dict[str, Any]:
if self.max_range_min and minutes > self.max_range_min:
raise ValueError(f"minutes exceeds ORS_MAX_RANGE ({minutes} > {self.max_range_min})")
lat, lon = center
payload = {"locations": [[lon, lat]], "range": [int(minutes) * 60]}
if self.generalize:
payload["generalize"] = self.generalize
r = self._post(self._iso_url, payload)
if r.status_code != 200:
if self._fallback is not None:
feat = self._fallback.isochrone(center, minutes)
feat["properties"]["provider"] = "ors_fallback_crowfly"
return feat
hint = {
400: "Bad request (profile/range/params).",
401: "Unauthorized (check ORS_API_KEY).",
403: "Forbidden (quota/key).",
404: "Profile/endpoint not found.",
413: "Payload too large.",
422: "Unprocessable (non-routable location or bad range).",
429: "Rate limited.",
500: "Server error.",
502: "Bad gateway.",
503: "Service unavailable.",
504: "Gateway timeout.",
}.get(r.status_code, "Unexpected error.")
raise RuntimeError(f"ORS isochrone error: HTTP {r.status_code}. {hint}")
data = r.json()
geom = _largest_polygon_geometry_from_ors(data)
if not geom:
if self._fallback is not None:
feat = self._fallback.isochrone(center, minutes)
feat["properties"]["provider"] = "ors_empty_fallback_crowfly"
return feat
raise RuntimeError("ORS returned no polygon geometry.")
return {
"type": "Feature",
"properties": {"provider": "ors", "profile": self.profile, "minutes": minutes, "center": [lon, lat]},
"geometry": geom,
}
# Batch multiple ranges in one call (reduces rate-limit pressure)
def isochrones(self, center: LatLon, minutes_list: List[int]) -> List[Dict[str, Any]]:
lat, lon = center
secs = [int(m) * 60 for m in minutes_list]
payload = {"locations": [[lon, lat]], "range": secs}
if self.generalize:
payload["generalize"] = self.generalize
r = self._post(self._iso_url, payload)
if r.status_code != 200:
# degrade by single-calls if fallback exists
if self._fallback is not None:
return [self.isochrone(center, m) for m in minutes_list]
raise RuntimeError(f"ORS isochrones error: HTTP {r.status_code}")
data = r.json()
feats: List[Dict[str, Any]] = []
for feat in (data.get("features") or []):
geom = _largest_polygon_geometry_from_ors({"features": [feat]})
if not geom:
continue
feats.append({
"type": "Feature",
"properties": {"provider": "ors", "profile": self.profile},
"geometry": geom
})
return feats
# ---------- helpers ----------
def _largest_polygon_geometry_from_ors(fc: Dict[str, Any]) -> Dict[str, Any] | None:
features = fc.get("features") or []
best_geom, best_area = None, -1.0
for feat in features:
geom = feat.get("geometry") or {}
gtype = geom.get("type"); coords = geom.get("coordinates")
if not coords: continue
if gtype == "Polygon":
area = _polygon_area_m2(coords[0])
if area > best_area:
best_area, best_geom = area, {"type":"Polygon","coordinates":coords}
elif gtype == "MultiPolygon":
for poly in coords:
if not poly: continue
ring = poly[0]
area = _polygon_area_m2(ring)
if area > best_area:
best_area, best_geom = area, {"type":"Polygon","coordinates":[ring]}
return best_geom
def _polygon_area_m2(ring_lonlat: Iterable[Iterable[float]]) -> float:
pts = list(ring_lonlat)
if len(pts) < 3: return 0.0
if pts[0] != pts[-1]: pts = pts + [pts[0]]
def merc(lon: float, lat: float) -> Tuple[float, float]:
R = 6378137.0
x = math.radians(lon) * R
lat = max(min(lat, 89.9999), -89.9999)
y = math.log(math.tan(math.pi/4 + math.radians(lat)/2)) * R
return x, y
xs, ys = [], []
for lon, lat in pts:
x, y = merc(float(lon), float(lat)); xs.append(x); ys.append(y)
area = 0.0
for i in range(len(xs)-1):
area += xs[i]*ys[i+1] - xs[i+1]*ys[i]
return abs(area) * 0.5