From c748a69bd42fe7885aa396778a770991a7928562 Mon Sep 17 00:00:00 2001 From: Ekaropolus Date: Wed, 17 Sep 2025 00:49:52 -0600 Subject: [PATCH] Views for bots de harcoded handlers --- pxy_bots/views.py | 235 ++++++++-------------------------------------- 1 file changed, 38 insertions(+), 197 deletions(-) diff --git a/pxy_bots/views.py b/pxy_bots/views.py index 7f8d869..6851a71 100644 --- a/pxy_bots/views.py +++ b/pxy_bots/views.py @@ -1,35 +1,19 @@ # pxy_bots/views.py -import os import json import logging from typing import Any, Dict, Optional -import openai -from telegram import Update, Bot +from telegram import Bot from django.http import JsonResponse, HttpResponse from django.views.decorators.csrf import csrf_exempt from django.core.cache import cache - - -from .models import TelegramBot -from pxy_langchain.services import LangchainAIService -from .handlers import ( - start, help_command, handle_location, - next_truck, report_trash, private_pickup, green_balance, - next_route, complete_stop, missed_stop, city_eco_score, - available_jobs, accept_job, next_pickup, complete_pickup, private_eco_score -) - - -from .router import pick_db_route, pick_url, post_json -from .renderer import render_spec from asgiref.sync import sync_to_async - - +from .models import TelegramBot +from .router import pick_db_route, pick_url, post_json +from .renderer import render_spec logger = logging.getLogger(__name__) -openai.api_key = os.getenv("OPENAI_API_KEY") # --------------------------- @@ -55,22 +39,30 @@ def _extract_media(msg: Dict[str, Any]) -> Optional[Dict[str, Any]]: return _pick_photo(msg.get("photo") or []) if "voice" in msg: v = msg["voice"] - return {"type": "voice", "file_id": v.get("file_id"), "mime": v.get("mime_type"), "size_bytes": v.get("file_size"), "duration": v.get("duration")} + return {"type": "voice", "file_id": v.get("file_id"), "mime": v.get("mime_type"), + "size_bytes": v.get("file_size"), "duration": v.get("duration")} if "audio" in msg: a = msg["audio"] - return {"type": "audio", "file_id": a.get("file_id"), "mime": a.get("mime_type"), "size_bytes": a.get("file_size"), "duration": a.get("duration")} + return {"type": "audio", "file_id": a.get("file_id"), "mime": a.get("mime_type"), + "size_bytes": a.get("file_size"), "duration": a.get("duration")} if "video" in msg: v = msg["video"] - return {"type": "video", "file_id": v.get("file_id"), "mime": v.get("mime_type"), "size_bytes": v.get("file_size"), "duration": v.get("duration"), "width": v.get("width"), "height": v.get("height")} + return {"type": "video", "file_id": v.get("file_id"), "mime": v.get("mime_type"), + "size_bytes": v.get("file_size"), "duration": v.get("duration"), + "width": v.get("width"), "height": v.get("height")} if "video_note" in msg: v = msg["video_note"] - return {"type": "video_note", "file_id": v.get("file_id"), "mime": None, "size_bytes": v.get("file_size"), "duration": v.get("duration"), "length": v.get("length")} + return {"type": "video_note", "file_id": v.get("file_id"), "mime": None, + "size_bytes": v.get("file_size"), "duration": v.get("duration"), + "length": v.get("length")} if "animation" in msg: a = msg["animation"] - return {"type": "animation", "file_id": a.get("file_id"), "mime": a.get("mime_type"), "size_bytes": a.get("file_size")} + return {"type": "animation", "file_id": a.get("file_id"), "mime": a.get("mime_type"), + "size_bytes": a.get("file_size")} if "document" in msg: d = msg["document"] - return {"type": "document", "file_id": d.get("file_id"), "mime": d.get("mime_type"), "size_bytes": d.get("file_size"), "file_name": d.get("file_name")} + return {"type": "document", "file_id": d.get("file_id"), "mime": d.get("mime_type"), + "size_bytes": d.get("file_size"), "file_name": d.get("file_name")} return None def build_req_v1(update: Dict[str, Any], bot_name: str) -> Dict[str, Any]: @@ -158,87 +150,7 @@ def build_req_v1(update: Dict[str, Any], bot_name: str) -> Dict[str, Any]: # --------------------------- -# Existing helper flows -# --------------------------- - -async def handle_location_message(update: Update): - if update.message and update.message.location: - await handle_location(update) - return True - return False - -async def dispatch_citizen_commands(update: Update, text: str): - if text == "/start": - await start(update) - elif text == "/help": - await help_command(update) - elif text == "/next_truck": - await next_truck(update) - elif text == "/report_trash": - await report_trash(update) - elif text == "/private_pickup": - await private_pickup(update) - elif text == "/green_balance": - await green_balance(update) - else: - return False - return True - -async def dispatch_city_commands(update: Update, text: str): - if text == "/start": - await start(update) - elif text == "/help": - await help_command(update) - elif text == "/next_route": - await next_route(update) - elif text == "/complete_stop": - await complete_stop(update) - elif text == "/missed_stop": - await missed_stop(update) - elif text == "/my_eco_score": - await city_eco_score(update) - else: - return False - return True - -async def dispatch_private_commands(update: Update, text: str): - if text == "/start": - await start(update) - elif text == "/help": - await help_command(update) - elif text == "/available_jobs": - await available_jobs(update) - elif text.startswith("/accept_job"): - await accept_job(update) - elif text == "/next_pickup": - await next_pickup(update) - elif text == "/complete_pickup": - await complete_pickup(update) - elif text == "/my_eco_score": - await private_eco_score(update) - else: - return False - return True - -async def transcribe_with_whisper(update: Update, bot: Bot) -> Optional[str]: - # 1) Download audio from Telegram - tg_file = await bot.get_file(update.message.voice.file_id) - download_path = f"/tmp/{update.message.voice.file_id}.ogg" - await tg_file.download_to_drive(download_path) - - # 2) Transcribe (OpenAI) - with open(download_path, "rb") as audio: - transcript_str = openai.audio.transcriptions.create( - model="gpt-4o-transcribe", # or "whisper-1" - file=audio, - response_format="text", - language="es", - ) - return transcript_str.strip() if transcript_str else None - - -# --------------------------- -# Webhook +# Webhook (thin router) # --------------------------- @csrf_exempt @@ -257,9 +169,6 @@ async def telegram_webhook(request, bot_name: str): except TelegramBot.DoesNotExist: return JsonResponse({"error": f"Bot '{bot_name}' not found."}, status=400) - if not bot_instance.assistant: - return JsonResponse({"error": "Assistant not configured."}, status=400) - # Parse raw payload try: payload = json.loads(request.body.decode("utf-8") or "{}") @@ -288,16 +197,16 @@ async def telegram_webhook(request, bot_name: str): return JsonResponse({"status": "duplicate_skipped"}) # ----------------------------------------------------------------- - # Build canonical req.v1 (log only for now) + # Build canonical req.v1 try: canon = build_req_v1(payload, bot_name) logger.info("tg.canonical env=%s", json.dumps(canon, ensure_ascii=False)) except Exception as e: logger.exception("tg.canonical.failed: %s", e) - canon = {} + return JsonResponse({"ok": False, "error": "canonical_failed"}, status=400) - # ----- DB-driven route (Admin) ----- - route = pick_db_route(bot_name, canon) if 'canon' in locals() else None + # ----- DB-driven route (Admin) ----- + route = pick_db_route(bot_name, canon) if route: chat_id = (canon.get("chat") or {}).get("id") status, body = await sync_to_async(post_json)( @@ -309,111 +218,43 @@ async def telegram_webhook(request, bot_name: str): if isinstance(body, dict) and ("messages" in body or body.get("schema_version") == "render.v1"): spec = body elif isinstance(body, dict) and "text" in body: - spec = {"schema_version": "render.v1", "messages": [{"type": "text", "text": str(body["text"])}]} + spec = {"schema_version": "render.v1", + "messages": [{"type": "text", "text": str(body["text"])}]} if spec and chat_id: bot = Bot(token=bot_instance.token) await render_spec(bot=bot, chat_id=chat_id, spec=spec) return JsonResponse({"status": "ok", "routed": True, "status_code": status}) - # ----------------------------------- - - # Try routing via in-memory map. If a URL exists, post req.v1 and render the response. + # ----- In-memory fallback (optional; remove when all routes live in DB) ----- try: route_url = pick_url(bot_name, canon) except Exception: route_url = None if route_url: - # do blocking HTTP off the event loop status, body = await sync_to_async(post_json)(route_url, canon) - logger.info("tg.routed url=%s status=%s", route_url, status) + logger.info("tg.routed(mem) url=%s status=%s", route_url, status) - # Accept either a full render_spec or any dict with a "messages" array spec = None if isinstance(body, dict) and ("messages" in body or body.get("schema_version") == "render.v1"): spec = body elif isinstance(body, dict) and "text" in body: - # lenient: wrap plain text into a render_spec - spec = {"schema_version": "render.v1", "messages": [{"type": "text", "text": str(body["text"])}]} + spec = {"schema_version": "render.v1", + "messages": [{"type": "text", "text": str(body["text"])}]} - if spec: - chat_id = (canon.get("chat") or {}).get("id") - if chat_id: - bot = Bot(token=bot_instance.token) - await render_spec(bot=bot, chat_id=chat_id, spec=spec) + chat_id = (canon.get("chat") or {}).get("id") + if spec and chat_id: + bot = Bot(token=bot_instance.token) + await render_spec(bot=bot, chat_id=chat_id, spec=spec) return JsonResponse({"status": "ok", "routed": True, "status_code": status}) - # If no spec, fall through to legacy handlers, but return routing status for visibility - logger.warning("tg.routed.no_spec url=%s body_keys=%s", route_url, list(body.keys()) if isinstance(body, dict) else type(body)) - - - # Convert to telegram.Update - update = Update.de_json(payload, Bot(token=bot_instance.token)) - - # --- TEMP demo: send a text + photo + buttons ---------------------- - if update.message and (update.message.text or "").strip() == "/_render_demo": - bot = Bot(token=bot_instance.token) - spec = { - "schema_version": "render.v1", - "messages": [ - {"type": "text", "text": "Demo: render_spec text ✅"}, - { - "type": "photo", - # Use a known-good image URL (or host your own under /static/) - "media_url": "https://picsum.photos/seed/polisplexity/800/480.jpg", - "caption": "Demo: render_spec photo ✅" - } - ], - "buttons": [ - {"label": "Abrir Dashboard", "kind": "open_url", "url": "https://app.polisplexity.tech/"}, - {"label": "Re-ejecutar 10’", "kind": "callback_api", "action": "rerun", "params": {"minutes": 10}} - ], - "telemetry": {"run_id": "demo-run-001"} - } - await render_spec(bot=bot, chat_id=update.effective_chat.id, spec=spec) - return JsonResponse({"status": "ok", "render_demo": True}) - # ------------------------------------------------------------------- - - if not update.message: - # No message (e.g., callback handled elsewhere in legacy); ack anyway - return JsonResponse({"status": "no message"}) - - # 1) Location first - if await handle_location_message(update): - return JsonResponse({"status": "ok"}) - - # 2) Voice → transcribe → LLM reply - if update.message.voice: - bot = Bot(token=bot_instance.token) - transcript = await transcribe_with_whisper(update, bot) - if not transcript: - await update.message.reply_text("No pude entender tu mensaje de voz. Intenta de nuevo.") - return JsonResponse({"status": "ok"}) - assistant_instance = await sync_to_async(LangchainAIService)(bot_instance.assistant) - bot_response = await sync_to_async(assistant_instance.generate_response)(transcript) - await update.message.reply_text(bot_response) - return JsonResponse({"status": "ok"}) - - # 3) Text commands by bot persona - text = update.message.text or "" - if bot_name == "PepeBasuritaCoinsBot" and await dispatch_citizen_commands(update, text): - return JsonResponse({"status": "ok"}) - if bot_name == "PepeCamioncitoBot" and await dispatch_city_commands(update, text): - return JsonResponse({"status": "ok"}) - if bot_name == "PepeMotitoBot" and await dispatch_private_commands(update, text): - return JsonResponse({"status": "ok"}) - - # 4) Fallback LLM for any other text - assistant_instance = await sync_to_async(LangchainAIService)(bot_instance.assistant) - bot_response = await sync_to_async(assistant_instance.generate_response)(text) - await update.message.reply_text(bot_response) - - return JsonResponse({"status": "ok"}) + # Nothing matched → 204 (ack, but no action) + logger.warning("tg.no_route bot=%s trigger=%s cmd=%s", bot_name, + (canon.get("command") or {}).get("trigger"), + (canon.get("command") or {}).get("name")) + return JsonResponse({"status": "no_route"}, status=204) except Exception as e: logger.exception("Error in webhook: %s", e) return JsonResponse({"error": f"Unexpected error: {str(e)}"}, status=500) - - -