Ekaropolus 577de989b9
Some checks failed
continuous-integration/drone/push Build is failing
Telgram Bot Handler for coordinates
2025-05-29 09:09:54 -06:00

83 lines
3.0 KiB
Python

from telegram import Update, ForceReply
import logging
from pxy_openai.assistants import OpenAIAssistant
from .models import TelegramBot
from asgiref.sync import sync_to_async
from pxy_langchain.models import AIAssistant
from pxy_langchain.services import LangchainAIService
logger = logging.getLogger(__name__)
async def dream_city_command(update: Update):
"""Send a message with a link to the random city generator."""
message = (
"Descubre la ciudad de tus sueños! Haz clic en el enlace para explorar una ciudad generada aleatoriamente que podría reflejar tus aspiraciones urbanas: "
"https://app.polisplexity.tech/city/digital/twin/dream/?innovation=30&technology=30&science=40"
)
await update.message.reply_text(message)
async def start(update: Update):
"""Send a message when the command /start is issued."""
user = update.effective_user
await update.message.reply_html(
rf"Hi {user.mention_html()}!",
reply_markup=ForceReply(selective=True),
)
async def help_command(update: Update):
"""Send a message when the command /help is issued."""
user = update.effective_user
await update.message.reply_text(f"Help! How can I assist you, {user.first_name}?")
async def handle_location(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
"""Respond to a location message by sending back the dynamic Digital Twin link."""
location = update.message.location
if not location:
await update.message.reply_text("Please share your location.")
return
lat = location.latitude
lon = location.longitude
reply = (
"🚀 Your Digital Twin is ready!\n\n"
f"🌍 Latitude: {lat}\n"
f"🌍 Longitude: {lon}\n\n"
"🔗 Access it here:\n"
f"https://app.polisplexity.tech/city/digital/twin/osm_city/?lat={lat}&long={lon}&scale=0.1\n\n"
"Thanks for sharing your location!"
)
await update.message.reply_text(reply)
async def respond(update, bot_name):
"""Respond to user messages using the LangChain AI service."""
try:
user_message = update.message.text
# Fetch the Telegram bot and its assigned AI assistant asynchronously
telegram_bot = await sync_to_async(TelegramBot.objects.get)(name=bot_name, is_active=True)
if not telegram_bot.assistant:
raise ValueError(f"No assistant configured for bot '{bot_name}'.")
# Fetch the AI assistant linked to this bot
assistant = await sync_to_async(AIAssistant.objects.get)(name=telegram_bot.assistant.name)
# Initialize AI service
ai_service = LangchainAIService(assistant)
# Generate response asynchronously
bot_response = await sync_to_async(ai_service.generate_response)(user_message)
await update.message.reply_text(bot_response)
except TelegramBot.DoesNotExist:
await update.message.reply_text(f"Bot '{bot_name}' not found or inactive.")
except Exception as e:
await update.message.reply_text(f"Oops! Something went wrong: {e}")