# pxy_bots/views.py import os import json import logging from typing import Any, Dict, Optional import openai from telegram import Update, Bot from django.http import JsonResponse, HttpResponse from django.views.decorators.csrf import csrf_exempt from asgiref.sync import sync_to_async from .models import TelegramBot from pxy_langchain.services import LangchainAIService from .handlers import ( start, help_command, handle_location, next_truck, report_trash, private_pickup, green_balance, next_route, complete_stop, missed_stop, city_eco_score, available_jobs, accept_job, next_pickup, complete_pickup, private_eco_score ) logger = logging.getLogger(__name__) openai.api_key = os.getenv("OPENAI_API_KEY") # at top with other imports from .renderer import render_spec # top imports from django.core.cache import cache # --------------------------- # Canonical req.v1 builder # --------------------------- def _pick_photo(sizes): if not sizes: return None sizes = sorted(sizes, key=lambda s: (s.get("width", 0) * s.get("height", 0)), reverse=True) top = sizes[0] return { "type": "photo", "file_id": top.get("file_id"), "mime": "image/jpeg", "size_bytes": None, "width": top.get("width"), "height": top.get("height"), } def _extract_media(msg: Dict[str, Any]) -> Optional[Dict[str, Any]]: if "photo" in msg: return _pick_photo(msg.get("photo") or []) if "voice" in msg: v = msg["voice"] return {"type": "voice", "file_id": v.get("file_id"), "mime": v.get("mime_type"), "size_bytes": v.get("file_size"), "duration": v.get("duration")} if "audio" in msg: a = msg["audio"] return {"type": "audio", "file_id": a.get("file_id"), "mime": a.get("mime_type"), "size_bytes": a.get("file_size"), "duration": a.get("duration")} if "video" in msg: v = msg["video"] return {"type": "video", "file_id": v.get("file_id"), "mime": v.get("mime_type"), "size_bytes": v.get("file_size"), "duration": v.get("duration"), "width": v.get("width"), "height": v.get("height")} if "video_note" in msg: v = msg["video_note"] return {"type": "video_note", "file_id": v.get("file_id"), "mime": None, "size_bytes": v.get("file_size"), "duration": v.get("duration"), "length": v.get("length")} if "animation" in msg: a = msg["animation"] return {"type": "animation", "file_id": a.get("file_id"), "mime": a.get("mime_type"), "size_bytes": a.get("file_size")} if "document" in msg: d = msg["document"] return {"type": "document", "file_id": d.get("file_id"), "mime": d.get("mime_type"), "size_bytes": d.get("file_size"), "file_name": d.get("file_name")} return None def build_req_v1(update: Dict[str, Any], bot_name: str) -> Dict[str, Any]: """ Normalize a Telegram update into our canonical req.v1 envelope. Pure function. No network, no state. """ schema_version = "req.v1" update_id = update.get("update_id") msg = update.get("message") or update.get("edited_message") cbq = update.get("callback_query") if msg: chat = msg.get("chat") or {} user = msg.get("from") or {} message_id = msg.get("message_id") ts = msg.get("date") text = msg.get("text") caption = msg.get("caption") location = msg.get("location") media = _extract_media(msg) trigger = "message" elif cbq: m = cbq.get("message") or {} chat = m.get("chat") or {} user = cbq.get("from") or {} message_id = m.get("message_id") ts = m.get("date") or None text = None caption = None location = None media = None trigger = "callback" else: chat = {} user = update.get("from") or {} message_id = None ts = None text = None caption = None location = None media = None trigger = "unknown" raw_cmd = None if text and isinstance(text, str) and text.startswith("/"): raw_cmd = text.split()[0][1:] elif caption and isinstance(caption, str) and caption.startswith("/"): raw_cmd = caption.split()[0][1:] elif cbq and isinstance(cbq.get("data"), str): raw_cmd = None # callbacks carry 'data' instead env = { "schema_version": schema_version, "bot": {"username": bot_name}, "chat": {"id": chat.get("id"), "type": chat.get("type")}, "user": {"id": user.get("id"), "language": user.get("language_code")}, "command": { "name": raw_cmd, "version": 1, "trigger": ("text_command" if raw_cmd and trigger == "message" else ("callback" if trigger == "callback" else trigger)), }, "input": { "text": text, "caption": caption, "args_raw": text or caption, "media": media, "location": ({"lat": location.get("latitude"), "lon": location.get("longitude")} if location else None), }, "callback": ( {"id": cbq.get("id"), "data": cbq.get("data"), "origin": {"message_id": message_id, "chat_id": chat.get("id")}} if cbq else None ), "context": { "message_id": message_id, "update_id": update_id, "ts": ts, "idempotency_key": f"tg:{message_id}:{user.get('id')}" if message_id and user.get("id") else None, }, } return env # --------------------------- # Existing helper flows # --------------------------- async def handle_location_message(update: Update): if update.message and update.message.location: await handle_location(update) return True return False async def dispatch_citizen_commands(update: Update, text: str): if text == "/start": await start(update) elif text == "/help": await help_command(update) elif text == "/next_truck": await next_truck(update) elif text == "/report_trash": await report_trash(update) elif text == "/private_pickup": await private_pickup(update) elif text == "/green_balance": await green_balance(update) else: return False return True async def dispatch_city_commands(update: Update, text: str): if text == "/start": await start(update) elif text == "/help": await help_command(update) elif text == "/next_route": await next_route(update) elif text == "/complete_stop": await complete_stop(update) elif text == "/missed_stop": await missed_stop(update) elif text == "/my_eco_score": await city_eco_score(update) else: return False return True async def dispatch_private_commands(update: Update, text: str): if text == "/start": await start(update) elif text == "/help": await help_command(update) elif text == "/available_jobs": await available_jobs(update) elif text.startswith("/accept_job"): await accept_job(update) elif text == "/next_pickup": await next_pickup(update) elif text == "/complete_pickup": await complete_pickup(update) elif text == "/my_eco_score": await private_eco_score(update) else: return False return True async def transcribe_with_whisper(update: Update, bot: Bot) -> Optional[str]: # 1) Download audio from Telegram tg_file = await bot.get_file(update.message.voice.file_id) download_path = f"/tmp/{update.message.voice.file_id}.ogg" await tg_file.download_to_drive(download_path) # 2) Transcribe (OpenAI) with open(download_path, "rb") as audio: transcript_str = openai.audio.transcriptions.create( model="gpt-4o-transcribe", # or "whisper-1" file=audio, response_format="text", language="es", ) return transcript_str.strip() if transcript_str else None # --------------------------- # Webhook # --------------------------- @csrf_exempt async def telegram_webhook(request, bot_name: str): try: logger.info("Webhook called for bot=%s", bot_name) if request.method != "POST": return HttpResponse(status=405) # Load bot (sync ORM via sync_to_async) try: bot_instance = await sync_to_async(TelegramBot.objects.get)( name=bot_name, is_active=True ) except TelegramBot.DoesNotExist: return JsonResponse({"error": f"Bot '{bot_name}' not found."}, status=400) if not bot_instance.assistant: return JsonResponse({"error": "Assistant not configured."}, status=400) # Parse raw payload try: payload = json.loads(request.body.decode("utf-8") or "{}") except json.JSONDecodeError: return JsonResponse({"ok": False, "error": "invalid_json"}, status=400) # ----- Idempotency / retry guard (drops duplicates for ~90s) ----- upd_id = payload.get("update_id") # Fallback if no update_id: use message_id + user_id fallback_msg = (payload.get("message") or {}).get("message_id") fallback_user = ((payload.get("message") or {}).get("from") or {}).get("id") dedupe_key = None if upd_id is not None: dedupe_key = f"tg:update:{upd_id}" elif fallback_msg and fallback_user: dedupe_key = f"tg:msg:{fallback_msg}:{fallback_user}" if dedupe_key: # cache.add returns True if the key did not exist (first time), False otherwise if not cache.add(dedupe_key, "1", timeout=90): logger.info("tg.idempotent.skip key=%s", dedupe_key) return JsonResponse({"status": "duplicate_skipped"}) # ----------------------------------------------------------------- # Build canonical req.v1 (LOG ONLY for now) try: canon = build_req_v1(payload, bot_name) logger.info("tg.canonical env=%s", json.dumps(canon, ensure_ascii=False)) except Exception as e: logger.exception("tg.canonical.failed: %s", e) # Convert to telegram.Update update = Update.de_json(payload, Bot(token=bot_instance.token)) # --- TEMP: demo renderer (safe to delete later) ---------------------- # If user sends "/_render_demo", send a text + photo + buttons if update.message and (update.message.text or "").strip() == "/_render_demo": bot = Bot(token=bot_instance.token) spec = { "schema_version": "render.v1", "messages": [ {"type": "text", "text": "Demo: render_spec text ✅"}, { "type": "photo", "media_url": "https://upload.wikimedia.org/wikipedia/commons/5/5f/Alameda_Central_CDMX.jpg", "caption": "Demo: render_spec photo ✅" } ], "buttons": [ {"label": "Abrir Dashboard", "kind": "open_url", "url": "https://app.polisplexity.tech/"}, {"label": "Re-ejecutar 10’", "kind": "callback_api", "action": "rerun", "params": {"minutes": 10}} ], "telemetry": {"run_id": "demo-run-001"} } await render_spec(bot=bot, chat_id=update.effective_chat.id, spec=spec) return JsonResponse({"status": "ok", "render_demo": True}) # -------------------------------------------------------------------- if not update.message: # No message (e.g., callback handled elsewhere in legacy); ack anyway return JsonResponse({"status": "no message"}) # 1) Location first if await handle_location_message(update): return JsonResponse({"status": "ok"}) # 2) Voice → transcribe → LLM reply if update.message.voice: bot = Bot(token=bot_instance.token) transcript = await transcribe_with_whisper(update, bot) if not transcript: await update.message.reply_text("No pude entender tu mensaje de voz. Intenta de nuevo.") return JsonResponse({"status": "ok"}) assistant_instance = await sync_to_async(LangchainAIService)(bot_instance.assistant) bot_response = await sync_to_async(assistant_instance.generate_response)(transcript) await update.message.reply_text(bot_response) return JsonResponse({"status": "ok"}) # 3) Text commands by bot persona text = update.message.text or "" if bot_name == "PepeBasuritaCoinsBot" and await dispatch_citizen_commands(update, text): return JsonResponse({"status": "ok"}) if bot_name == "PepeCamioncitoBot" and await dispatch_city_commands(update, text): return JsonResponse({"status": "ok"}) if bot_name == "PepeMotitoBot" and await dispatch_private_commands(update, text): return JsonResponse({"status": "ok"}) # 4) Fallback LLM for any other text assistant_instance = await sync_to_async(LangchainAIService)(bot_instance.assistant) bot_response = await sync_to_async(assistant_instance.generate_response)(text) await update.message.reply_text(bot_response) return JsonResponse({"status": "ok"}) except Exception as e: logger.exception("Error in webhook: %s", e) return JsonResponse({"error": f"Unexpected error: {str(e)}"}, status=500)