# pxy_bots/views.py import json import logging from typing import Any, Dict, Optional from telegram import Bot from django.http import JsonResponse, HttpResponse from django.views.decorators.csrf import csrf_exempt from django.core.cache import cache from asgiref.sync import sync_to_async from .models import TelegramBot from .router import pick_db_route, pick_url, post_json from .renderer import render_spec logger = logging.getLogger(__name__) # --------------------------- # Canonical req.v1 builder # --------------------------- def _pick_photo(sizes): if not sizes: return None sizes = sorted(sizes, key=lambda s: (s.get("width", 0) * s.get("height", 0)), reverse=True) top = sizes[0] return { "type": "photo", "file_id": top.get("file_id"), "mime": "image/jpeg", "size_bytes": None, "width": top.get("width"), "height": top.get("height"), } def _extract_media(msg: Dict[str, Any]) -> Optional[Dict[str, Any]]: if "photo" in msg: return _pick_photo(msg.get("photo") or []) if "voice" in msg: v = msg["voice"] return {"type": "voice", "file_id": v.get("file_id"), "mime": v.get("mime_type"), "size_bytes": v.get("file_size"), "duration": v.get("duration")} if "audio" in msg: a = msg["audio"] return {"type": "audio", "file_id": a.get("file_id"), "mime": a.get("mime_type"), "size_bytes": a.get("file_size"), "duration": a.get("duration")} if "video" in msg: v = msg["video"] return {"type": "video", "file_id": v.get("file_id"), "mime": v.get("mime_type"), "size_bytes": v.get("file_size"), "duration": v.get("duration"), "width": v.get("width"), "height": v.get("height")} if "video_note" in msg: v = msg["video_note"] return {"type": "video_note", "file_id": v.get("file_id"), "mime": None, "size_bytes": v.get("file_size"), "duration": v.get("duration"), "length": v.get("length")} if "animation" in msg: a = msg["animation"] return {"type": "animation", "file_id": a.get("file_id"), "mime": a.get("mime_type"), "size_bytes": a.get("file_size")} if "document" in msg: d = msg["document"] return {"type": "document", "file_id": d.get("file_id"), "mime": d.get("mime_type"), "size_bytes": d.get("file_size"), "file_name": d.get("file_name")} return None def build_req_v1(update: Dict[str, Any], bot_name: str) -> Dict[str, Any]: """ Normalize a Telegram update into our canonical req.v1 envelope. Pure function. No network, no state. """ schema_version = "req.v1" update_id = update.get("update_id") msg = update.get("message") or update.get("edited_message") cbq = update.get("callback_query") if msg: chat = msg.get("chat") or {} user = msg.get("from") or {} message_id = msg.get("message_id") ts = msg.get("date") text = msg.get("text") caption = msg.get("caption") location = msg.get("location") media = _extract_media(msg) trigger = "message" elif cbq: m = cbq.get("message") or {} chat = m.get("chat") or {} user = cbq.get("from") or {} message_id = m.get("message_id") ts = m.get("date") or None text = None caption = None location = None media = None trigger = "callback" else: chat = {} user = update.get("from") or {} message_id = None ts = None text = None caption = None location = None media = None trigger = "unknown" raw_cmd = None if text and isinstance(text, str) and text.startswith("/"): raw_cmd = text.split()[0][1:] elif caption and isinstance(caption, str) and caption.startswith("/"): raw_cmd = caption.split()[0][1:] elif cbq and isinstance(cbq.get("data"), str): raw_cmd = None # callbacks carry 'data' instead env = { "schema_version": schema_version, "bot": {"username": bot_name}, "chat": {"id": chat.get("id"), "type": chat.get("type")}, "user": {"id": user.get("id"), "language": user.get("language_code")}, "command": { "name": raw_cmd, "version": 1, "trigger": ("text_command" if raw_cmd and trigger == "message" else ("callback" if trigger == "callback" else trigger)), }, "input": { "text": text, "caption": caption, "args_raw": text or caption, "media": media, "location": ({"lat": location.get("latitude"), "lon": location.get("longitude")} if location else None), }, "callback": ( {"id": cbq.get("id"), "data": cbq.get("data"), "origin": {"message_id": message_id, "chat_id": chat.get("id")}} if cbq else None ), "context": { "message_id": message_id, "update_id": update_id, "ts": ts, "idempotency_key": f"tg:{message_id}:{user.get('id')}" if message_id and user.get("id") else None, }, } return env # --------------------------- # Webhook (thin router) # --------------------------- @csrf_exempt async def telegram_webhook(request, bot_name: str): try: logger.info("Webhook called for bot=%s", bot_name) if request.method != "POST": return HttpResponse(status=405) # Load bot (sync ORM via sync_to_async) try: bot_instance = await sync_to_async(TelegramBot.objects.get)( name=bot_name, is_active=True ) except TelegramBot.DoesNotExist: return JsonResponse({"error": f"Bot '{bot_name}' not found."}, status=400) # Parse raw payload try: payload = json.loads(request.body.decode("utf-8") or "{}") except json.JSONDecodeError: return JsonResponse({"ok": False, "error": "invalid_json"}, status=400) # ----- Idempotency / retry guard (drops duplicates for ~90s) ----- upd_id = payload.get("update_id") cbq = payload.get("callback_query") or {} cbq_id = cbq.get("id") msg = payload.get("message") or {} fallback_msg_id = msg.get("message_id") fallback_user = (msg.get("from") or {}).get("id") dedupe_key = None if upd_id is not None: dedupe_key = f"tg:update:{upd_id}" elif cbq_id: dedupe_key = f"tg:cbq:{cbq_id}" elif fallback_msg_id and fallback_user: dedupe_key = f"tg:msg:{fallback_msg_id}:{fallback_user}" if dedupe_key: if not cache.add(dedupe_key, "1", timeout=90): logger.info("tg.idempotent.skip key=%s", dedupe_key) return JsonResponse({"status": "duplicate_skipped"}) # ----------------------------------------------------------------- # Build canonical req.v1 try: canon = build_req_v1(payload, bot_name) logger.info("tg.canonical env=%s", json.dumps(canon, ensure_ascii=False)) except Exception as e: logger.exception("tg.canonical.failed: %s", e) return JsonResponse({"ok": False, "error": "canonical_failed"}, status=400) # ----- DB-driven route (Admin) ----- route = pick_db_route(bot_name, canon) if route: chat_id = (canon.get("chat") or {}).get("id") status, body = await sync_to_async(post_json)( route["url"], canon, route.get("timeout", 4), route.get("headers") ) logger.info("tg.routed(db) url=%s status=%s", route["url"], status) spec = None if isinstance(body, dict) and ("messages" in body or body.get("schema_version") == "render.v1"): spec = body elif isinstance(body, dict) and "text" in body: spec = {"schema_version": "render.v1", "messages": [{"type": "text", "text": str(body["text"])}]} if spec and chat_id: bot = Bot(token=bot_instance.token) await render_spec(bot=bot, chat_id=chat_id, spec=spec) return JsonResponse({"status": "ok", "routed": True, "status_code": status}) # ----- In-memory fallback (optional; remove when all routes live in DB) ----- try: route_url = pick_url(bot_name, canon) except Exception: route_url = None if route_url: status, body = await sync_to_async(post_json)(route_url, canon) logger.info("tg.routed(mem) url=%s status=%s", route_url, status) spec = None if isinstance(body, dict) and ("messages" in body or body.get("schema_version") == "render.v1"): spec = body elif isinstance(body, dict) and "text" in body: spec = {"schema_version": "render.v1", "messages": [{"type": "text", "text": str(body["text"])}]} chat_id = (canon.get("chat") or {}).get("id") if spec and chat_id: bot = Bot(token=bot_instance.token) await render_spec(bot=bot, chat_id=chat_id, spec=spec) return JsonResponse({"status": "ok", "routed": True, "status_code": status}) # Nothing matched → 204 (ack, but no action) logger.warning("tg.no_route bot=%s trigger=%s cmd=%s", bot_name, (canon.get("command") or {}).get("trigger"), (canon.get("command") or {}).get("name")) return JsonResponse({"status": "no_route"}, status=204) except Exception as e: logger.exception("Error in webhook: %s", e) return JsonResponse({"error": f"Unexpected error: {str(e)}"}, status=500)