Ekaropolus 3fa732efbc
All checks were successful
continuous-integration/drone/push Build is passing
Routing for media
2025-09-17 18:39:47 -06:00

250 lines
9.4 KiB
Python

# pxy_bots/views.py
import json
import logging
from typing import Any, Dict, Optional
from telegram import Bot
from django.http import JsonResponse, HttpResponse
from django.views.decorators.csrf import csrf_exempt
from django.core.cache import cache
from asgiref.sync import sync_to_async
from .models import TelegramBot
from .router import pick_db_route, post_json
from .renderer import render_spec
logger = logging.getLogger(__name__)
# ---------------------------
# Canonical req.v1 builder
# ---------------------------
def _pick_photo(sizes):
if not sizes:
return None
sizes = sorted(sizes, key=lambda s: (s.get("width", 0) * s.get("height", 0)), reverse=True)
top = sizes[0]
return {
"type": "photo",
"file_id": top.get("file_id"),
"mime": "image/jpeg",
"size_bytes": None,
"width": top.get("width"),
"height": top.get("height"),
}
def _extract_media(msg: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if "photo" in msg:
return _pick_photo(msg.get("photo") or [])
if "voice" in msg:
v = msg["voice"]
return {"type": "voice", "file_id": v.get("file_id"), "mime": v.get("mime_type"),
"size_bytes": v.get("file_size"), "duration": v.get("duration")}
if "audio" in msg:
a = msg["audio"]
return {"type": "audio", "file_id": a.get("file_id"), "mime": a.get("mime_type"),
"size_bytes": a.get("file_size"), "duration": a.get("duration")}
if "video" in msg:
v = msg["video"]
return {"type": "video", "file_id": v.get("file_id"), "mime": v.get("mime_type"),
"size_bytes": v.get("file_size"), "duration": v.get("duration"),
"width": v.get("width"), "height": v.get("height")}
if "video_note" in msg:
v = msg["video_note"]
return {"type": "video_note", "file_id": v.get("file_id"), "mime": None,
"size_bytes": v.get("file_size"), "duration": v.get("duration"),
"length": v.get("length")}
if "animation" in msg:
a = msg["animation"]
return {"type": "animation", "file_id": a.get("file_id"), "mime": a.get("mime_type"),
"size_bytes": a.get("file_size")}
if "document" in msg:
d = msg["document"]
return {"type": "document", "file_id": d.get("file_id"), "mime": d.get("mime_type"),
"size_bytes": d.get("file_size"), "file_name": d.get("file_name")}
return None
def build_req_v1(update: Dict[str, Any], bot_name: str) -> Dict[str, Any]:
"""
Normalize a Telegram update into our canonical req.v1 envelope.
Pure function. No network, no state.
"""
schema_version = "req.v1"
update_id = update.get("update_id")
msg = update.get("message") or update.get("edited_message")
cbq = update.get("callback_query")
if msg:
chat = msg.get("chat") or {}
user = msg.get("from") or {}
message_id = msg.get("message_id")
ts = msg.get("date")
text = msg.get("text")
caption = msg.get("caption")
location = msg.get("location")
media = _extract_media(msg)
trigger = "message"
elif cbq:
m = cbq.get("message") or {}
chat = m.get("chat") or {}
user = cbq.get("from") or {}
message_id = m.get("message_id")
ts = m.get("date") or None
text = None
caption = None
location = None
media = None
trigger = "callback"
else:
chat = {}
user = update.get("from") or {}
message_id = None
ts = None
text = None
caption = None
location = None
media = None
trigger = "unknown"
raw_cmd = None
if text and isinstance(text, str) and text.startswith("/"):
raw_cmd = text.split()[0][1:]
elif caption and isinstance(caption, str) and caption.startswith("/"):
raw_cmd = caption.split()[0][1:]
elif cbq and isinstance(cbq.get("data"), str):
raw_cmd = None # callbacks carry 'data' instead
# --- NEW: pseudo-commands for media so we can route from Admin ---
if not raw_cmd:
if media and isinstance(media, dict):
mtype = (media.get("type") or "").lower()
if mtype in {"voice", "audio", "video", "photo", "document", "animation", "video_note"}:
raw_cmd = f"__{mtype}__"
elif location:
raw_cmd = "__location__"
env = {
"schema_version": schema_version,
"bot": {"username": bot_name},
"chat": {"id": chat.get("id"), "type": chat.get("type")},
"user": {"id": user.get("id"), "language": user.get("language_code")},
"command": {
"name": raw_cmd,
"version": 1,
"trigger": ("text_command" if raw_cmd and trigger == "message"
else ("callback" if trigger == "callback" else trigger)),
},
"input": {
"text": text,
"caption": caption,
"args_raw": text or caption,
"media": media,
"location": ({"lat": location.get("latitude"), "lon": location.get("longitude")} if location else None),
},
"callback": (
{"id": cbq.get("id"), "data": cbq.get("data"),
"origin": {"message_id": message_id, "chat_id": chat.get("id")}}
if cbq else None
),
"context": {
"message_id": message_id,
"update_id": update_id,
"ts": ts,
"idempotency_key": f"tg:{message_id}:{user.get('id')}" if message_id and user.get("id") else None,
},
}
return env
# ---------------------------
# Webhook (thin router)
# ---------------------------
@csrf_exempt
async def telegram_webhook(request, bot_name: str):
try:
logger.info("Webhook called for bot=%s", bot_name)
if request.method != "POST":
return HttpResponse(status=405)
# Load bot (sync ORM via sync_to_async)
try:
bot_instance = await sync_to_async(TelegramBot.objects.get)(
name=bot_name, is_active=True
)
except TelegramBot.DoesNotExist:
return JsonResponse({"error": f"Bot '{bot_name}' not found."}, status=400)
# Parse raw payload
try:
payload = json.loads(request.body.decode("utf-8") or "{}")
except json.JSONDecodeError:
return JsonResponse({"ok": False, "error": "invalid_json"}, status=400)
# ----- Idempotency / retry guard (drops duplicates for ~90s) -----
upd_id = payload.get("update_id")
cbq = payload.get("callback_query") or {}
cbq_id = cbq.get("id")
msg = payload.get("message") or {}
fallback_msg_id = msg.get("message_id")
fallback_user = (msg.get("from") or {}).get("id")
dedupe_key = None
if upd_id is not None:
dedupe_key = f"tg:update:{upd_id}"
elif cbq_id:
dedupe_key = f"tg:cbq:{cbq_id}"
elif fallback_msg_id and fallback_user:
dedupe_key = f"tg:msg:{fallback_msg_id}:{fallback_user}"
if dedupe_key:
if not cache.add(dedupe_key, "1", timeout=90):
logger.info("tg.idempotent.skip key=%s", dedupe_key)
return JsonResponse({"status": "duplicate_skipped"})
# -----------------------------------------------------------------
# Build canonical req.v1
try:
canon = build_req_v1(payload, bot_name)
logger.info("tg.canonical env=%s", json.dumps(canon, ensure_ascii=False))
except Exception as e:
logger.exception("tg.canonical.failed: %s", e)
return JsonResponse({"ok": False, "error": "canonical_failed"}, status=400)
# ----- DB-driven route (Admin) -----
route = pick_db_route(bot_name, canon)
if route:
chat_id = (canon.get("chat") or {}).get("id")
status, body = await sync_to_async(post_json)(
route["url"], canon, route.get("timeout", 4), route.get("headers")
)
logger.info("tg.routed(db) url=%s status=%s", route["url"], status)
spec = None
if isinstance(body, dict) and ("messages" in body or body.get("schema_version") == "render.v1"):
spec = body
elif isinstance(body, dict) and "text" in body:
spec = {"schema_version": "render.v1",
"messages": [{"type": "text", "text": str(body["text"])}]}
if spec and chat_id:
bot = Bot(token=bot_instance.token)
await render_spec(bot=bot, chat_id=chat_id, spec=spec)
return JsonResponse({"status": "ok", "routed": True, "status_code": status})
# Nothing matched → 204 (ack, but no action)
logger.warning(
"tg.no_route bot=%s trigger=%s cmd=%s",
bot_name,
(canon.get("command") or {}).get("trigger"),
(canon.get("command") or {}).get("name"),
)
return JsonResponse({"status": "no_route"}, status=204)
except Exception as e:
logger.exception("Error in webhook: %s", e)
return JsonResponse({"error": f"Unexpected error: {str(e)}"}, status=500)