306 lines
11 KiB
Python
306 lines
11 KiB
Python
# pxy_bots/views.py
|
|
import os
|
|
import json
|
|
import logging
|
|
from typing import Any, Dict, Optional
|
|
|
|
import openai
|
|
from telegram import Update, Bot
|
|
from django.http import JsonResponse, HttpResponse
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from asgiref.sync import sync_to_async
|
|
|
|
from .models import TelegramBot
|
|
from pxy_langchain.services import LangchainAIService
|
|
from .handlers import (
|
|
start, help_command, handle_location,
|
|
next_truck, report_trash, private_pickup, green_balance,
|
|
next_route, complete_stop, missed_stop, city_eco_score,
|
|
available_jobs, accept_job, next_pickup, complete_pickup, private_eco_score
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
openai.api_key = os.getenv("OPENAI_API_KEY")
|
|
|
|
# ---------------------------
|
|
# Canonical req.v1 builder
|
|
# ---------------------------
|
|
|
|
def _pick_photo(sizes):
|
|
if not sizes:
|
|
return None
|
|
sizes = sorted(sizes, key=lambda s: (s.get("width", 0) * s.get("height", 0)), reverse=True)
|
|
top = sizes[0]
|
|
return {
|
|
"type": "photo",
|
|
"file_id": top.get("file_id"),
|
|
"mime": "image/jpeg",
|
|
"size_bytes": None,
|
|
"width": top.get("width"),
|
|
"height": top.get("height"),
|
|
}
|
|
|
|
def _extract_media(msg: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
|
if "photo" in msg:
|
|
return _pick_photo(msg.get("photo") or [])
|
|
if "voice" in msg:
|
|
v = msg["voice"]
|
|
return {"type": "voice", "file_id": v.get("file_id"), "mime": v.get("mime_type"), "size_bytes": v.get("file_size"), "duration": v.get("duration")}
|
|
if "audio" in msg:
|
|
a = msg["audio"]
|
|
return {"type": "audio", "file_id": a.get("file_id"), "mime": a.get("mime_type"), "size_bytes": a.get("file_size"), "duration": a.get("duration")}
|
|
if "video" in msg:
|
|
v = msg["video"]
|
|
return {"type": "video", "file_id": v.get("file_id"), "mime": v.get("mime_type"), "size_bytes": v.get("file_size"), "duration": v.get("duration"), "width": v.get("width"), "height": v.get("height")}
|
|
if "video_note" in msg:
|
|
v = msg["video_note"]
|
|
return {"type": "video_note", "file_id": v.get("file_id"), "mime": None, "size_bytes": v.get("file_size"), "duration": v.get("duration"), "length": v.get("length")}
|
|
if "animation" in msg:
|
|
a = msg["animation"]
|
|
return {"type": "animation", "file_id": a.get("file_id"), "mime": a.get("mime_type"), "size_bytes": a.get("file_size")}
|
|
if "document" in msg:
|
|
d = msg["document"]
|
|
return {"type": "document", "file_id": d.get("file_id"), "mime": d.get("mime_type"), "size_bytes": d.get("file_size"), "file_name": d.get("file_name")}
|
|
return None
|
|
|
|
def build_req_v1(update: Dict[str, Any], bot_name: str) -> Dict[str, Any]:
|
|
"""
|
|
Normalize a Telegram update into our canonical req.v1 envelope.
|
|
Pure function. No network, no state.
|
|
"""
|
|
schema_version = "req.v1"
|
|
update_id = update.get("update_id")
|
|
|
|
msg = update.get("message") or update.get("edited_message")
|
|
cbq = update.get("callback_query")
|
|
|
|
if msg:
|
|
chat = msg.get("chat") or {}
|
|
user = msg.get("from") or {}
|
|
message_id = msg.get("message_id")
|
|
ts = msg.get("date")
|
|
text = msg.get("text")
|
|
caption = msg.get("caption")
|
|
location = msg.get("location")
|
|
media = _extract_media(msg)
|
|
trigger = "message"
|
|
elif cbq:
|
|
m = cbq.get("message") or {}
|
|
chat = m.get("chat") or {}
|
|
user = cbq.get("from") or {}
|
|
message_id = m.get("message_id")
|
|
ts = m.get("date") or None
|
|
text = None
|
|
caption = None
|
|
location = None
|
|
media = None
|
|
trigger = "callback"
|
|
else:
|
|
chat = {}
|
|
user = update.get("from") or {}
|
|
message_id = None
|
|
ts = None
|
|
text = None
|
|
caption = None
|
|
location = None
|
|
media = None
|
|
trigger = "unknown"
|
|
|
|
raw_cmd = None
|
|
if text and isinstance(text, str) and text.startswith("/"):
|
|
raw_cmd = text.split()[0][1:]
|
|
elif caption and isinstance(caption, str) and caption.startswith("/"):
|
|
raw_cmd = caption.split()[0][1:]
|
|
elif cbq and isinstance(cbq.get("data"), str):
|
|
raw_cmd = None # callbacks carry 'data' instead
|
|
|
|
env = {
|
|
"schema_version": schema_version,
|
|
"bot": {"username": bot_name},
|
|
"chat": {"id": chat.get("id"), "type": chat.get("type")},
|
|
"user": {"id": user.get("id"), "language": user.get("language_code")},
|
|
"command": {
|
|
"name": raw_cmd,
|
|
"version": 1,
|
|
"trigger": ("text_command" if raw_cmd and trigger == "message"
|
|
else ("callback" if trigger == "callback" else trigger)),
|
|
},
|
|
"input": {
|
|
"text": text,
|
|
"caption": caption,
|
|
"args_raw": text or caption,
|
|
"media": media,
|
|
"location": ({"lat": location.get("latitude"), "lon": location.get("longitude")} if location else None),
|
|
},
|
|
"callback": (
|
|
{"id": cbq.get("id"), "data": cbq.get("data"),
|
|
"origin": {"message_id": message_id, "chat_id": chat.get("id")}}
|
|
if cbq else None
|
|
),
|
|
"context": {
|
|
"message_id": message_id,
|
|
"update_id": update_id,
|
|
"ts": ts,
|
|
"idempotency_key": f"tg:{message_id}:{user.get('id')}" if message_id and user.get("id") else None,
|
|
},
|
|
}
|
|
return env
|
|
|
|
# ---------------------------
|
|
# Existing helper flows
|
|
# ---------------------------
|
|
|
|
async def handle_location_message(update: Update):
|
|
if update.message and update.message.location:
|
|
await handle_location(update)
|
|
return True
|
|
return False
|
|
|
|
async def dispatch_citizen_commands(update: Update, text: str):
|
|
if text == "/start":
|
|
await start(update)
|
|
elif text == "/help":
|
|
await help_command(update)
|
|
elif text == "/next_truck":
|
|
await next_truck(update)
|
|
elif text == "/report_trash":
|
|
await report_trash(update)
|
|
elif text == "/private_pickup":
|
|
await private_pickup(update)
|
|
elif text == "/green_balance":
|
|
await green_balance(update)
|
|
else:
|
|
return False
|
|
return True
|
|
|
|
async def dispatch_city_commands(update: Update, text: str):
|
|
if text == "/start":
|
|
await start(update)
|
|
elif text == "/help":
|
|
await help_command(update)
|
|
elif text == "/next_route":
|
|
await next_route(update)
|
|
elif text == "/complete_stop":
|
|
await complete_stop(update)
|
|
elif text == "/missed_stop":
|
|
await missed_stop(update)
|
|
elif text == "/my_eco_score":
|
|
await city_eco_score(update)
|
|
else:
|
|
return False
|
|
return True
|
|
|
|
async def dispatch_private_commands(update: Update, text: str):
|
|
if text == "/start":
|
|
await start(update)
|
|
elif text == "/help":
|
|
await help_command(update)
|
|
elif text == "/available_jobs":
|
|
await available_jobs(update)
|
|
elif text.startswith("/accept_job"):
|
|
await accept_job(update)
|
|
elif text == "/next_pickup":
|
|
await next_pickup(update)
|
|
elif text == "/complete_pickup":
|
|
await complete_pickup(update)
|
|
elif text == "/my_eco_score":
|
|
await private_eco_score(update)
|
|
else:
|
|
return False
|
|
return True
|
|
|
|
async def transcribe_with_whisper(update: Update, bot: Bot) -> Optional[str]:
|
|
# 1) Download audio from Telegram
|
|
tg_file = await bot.get_file(update.message.voice.file_id)
|
|
download_path = f"/tmp/{update.message.voice.file_id}.ogg"
|
|
await tg_file.download_to_drive(download_path)
|
|
|
|
# 2) Transcribe (OpenAI)
|
|
with open(download_path, "rb") as audio:
|
|
transcript_str = openai.audio.transcriptions.create(
|
|
model="gpt-4o-transcribe", # or "whisper-1"
|
|
file=audio,
|
|
response_format="text",
|
|
language="es",
|
|
)
|
|
return transcript_str.strip() if transcript_str else None
|
|
|
|
# ---------------------------
|
|
# Webhook
|
|
# ---------------------------
|
|
|
|
@csrf_exempt
|
|
async def telegram_webhook(request, bot_name: str):
|
|
try:
|
|
logger.info("Webhook called for bot=%s", bot_name)
|
|
|
|
if request.method != "POST":
|
|
return HttpResponse(status=405)
|
|
|
|
# Load bot (sync ORM via sync_to_async)
|
|
try:
|
|
bot_instance = await sync_to_async(TelegramBot.objects.get)(
|
|
name=bot_name, is_active=True
|
|
)
|
|
except TelegramBot.DoesNotExist:
|
|
return JsonResponse({"error": f"Bot '{bot_name}' not found."}, status=400)
|
|
|
|
if not bot_instance.assistant:
|
|
return JsonResponse({"error": "Assistant not configured."}, status=400)
|
|
|
|
# Parse raw payload
|
|
try:
|
|
payload = json.loads(request.body.decode("utf-8") or "{}")
|
|
except json.JSONDecodeError:
|
|
return JsonResponse({"ok": False, "error": "invalid_json"}, status=400)
|
|
|
|
# Build canonical req.v1 (LOG ONLY for now)
|
|
try:
|
|
canon = build_req_v1(payload, bot_name)
|
|
logger.info("tg.canonical env=%s", json.dumps(canon, ensure_ascii=False))
|
|
except Exception as e:
|
|
logger.exception("tg.canonical.failed: %s", e)
|
|
|
|
# Convert to telegram.Update
|
|
update = Update.de_json(payload, Bot(token=bot_instance.token))
|
|
|
|
if not update.message:
|
|
# No message (e.g., callback handled elsewhere in legacy); ack anyway
|
|
return JsonResponse({"status": "no message"})
|
|
|
|
# 1) Location first
|
|
if await handle_location_message(update):
|
|
return JsonResponse({"status": "ok"})
|
|
|
|
# 2) Voice → transcribe → LLM reply
|
|
if update.message.voice:
|
|
bot = Bot(token=bot_instance.token)
|
|
transcript = await transcribe_with_whisper(update, bot)
|
|
if not transcript:
|
|
await update.message.reply_text("No pude entender tu mensaje de voz. Intenta de nuevo.")
|
|
return JsonResponse({"status": "ok"})
|
|
assistant_instance = await sync_to_async(LangchainAIService)(bot_instance.assistant)
|
|
bot_response = await sync_to_async(assistant_instance.generate_response)(transcript)
|
|
await update.message.reply_text(bot_response)
|
|
return JsonResponse({"status": "ok"})
|
|
|
|
# 3) Text commands by bot persona
|
|
text = update.message.text or ""
|
|
if bot_name == "PepeBasuritaCoinsBot" and await dispatch_citizen_commands(update, text):
|
|
return JsonResponse({"status": "ok"})
|
|
if bot_name == "PepeCamioncitoBot" and await dispatch_city_commands(update, text):
|
|
return JsonResponse({"status": "ok"})
|
|
if bot_name == "PepeMotitoBot" and await dispatch_private_commands(update, text):
|
|
return JsonResponse({"status": "ok"})
|
|
|
|
# 4) Fallback LLM for any other text
|
|
assistant_instance = await sync_to_async(LangchainAIService)(bot_instance.assistant)
|
|
bot_response = await sync_to_async(assistant_instance.generate_response)(text)
|
|
await update.message.reply_text(bot_response)
|
|
|
|
return JsonResponse({"status": "ok"})
|
|
|
|
except Exception as e:
|
|
logger.exception("Error in webhook: %s", e)
|
|
return JsonResponse({"error": f"Unexpected error: {str(e)}"}, status=500)
|