Neo4j override on telegram bot messages
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Ekaropolus 2025-05-20 03:27:15 -06:00
parent 38fd3652f7
commit 2aff5888f5
4 changed files with 210 additions and 99 deletions

View File

@ -1,50 +1,52 @@
import json import json
from telegram import Update, Bot import logging
from django.http import JsonResponse from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt from django.views.decorators.csrf import csrf_exempt
from asgiref.sync import sync_to_async from asgiref.sync import sync_to_async
from .models import TelegramBot from telegram import Update, Bot
from django.utils import timezone
from django.conf import settings
from .models import TelegramBot, TelegramConversation, TelegramMessage
from pxy_langchain.services import LangchainAIService from pxy_langchain.services import LangchainAIService
from .handlers import dream_city_command, start, help_command, handle_location from .handlers import dream_city_command, start, help_command, handle_location
import logging
from .models import TelegramConversation, TelegramMessage, TelegramBot
from django.utils import timezone
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@csrf_exempt @csrf_exempt
async def telegram_webhook(request, bot_name): async def telegram_webhook(request, bot_name):
""" """
Webhook view that handles Telegram updates asynchronously and logs messages. Telegram webhook handler that logs inbound/outbound messages and
responds with AI or commands, resilient to Neo4j or send failures.
Always returns 200 OK to prevent Telegram retries.
""" """
try: try:
logger.info(f"Webhook called for bot: {bot_name}") logger.info(f"Webhook called for bot: {bot_name}")
# 1) Fetch the bot instance # 1) Load bot configuration
try: try:
bot_instance = await sync_to_async(TelegramBot.objects.get)( bot_instance = await sync_to_async(TelegramBot.objects.get)(
name=bot_name, is_active=True name=bot_name, is_active=True
) )
logger.info(f"Loaded bot configuration: {bot_instance}")
except TelegramBot.DoesNotExist: except TelegramBot.DoesNotExist:
logger.error(f"Bot '{bot_name}' not found or inactive.") logger.error(f"Bot '{bot_name}' not found or inactive.")
return JsonResponse({"error": f"Bot '{bot_name}' not found."}, status=400) return JsonResponse({"status": "ok", "error": f"Bot '{bot_name}' not found."}, status=200)
# 2) Ensure POST
if request.method != "POST": if request.method != "POST":
logger.warning("Received non-POST request") logger.warning("Received non-POST request to Telegram webhook.")
return JsonResponse({"error": "Invalid request method"}, status=405) return JsonResponse({"status": "ok"}, status=200)
# 2) Parse the incoming update # 3) Parse the update JSON
try: try:
payload = json.loads(request.body.decode("utf-8")) payload = json.loads(request.body.decode("utf-8"))
update = Update.de_json(payload, Bot(token=bot_instance.token)) update = Update.de_json(payload, Bot(token=bot_instance.token))
logger.info(f"Update received: {update}") logger.info(f"Parsed update: {update}")
except Exception as e: except Exception as e:
logger.error(f"Failed to parse update: {e}") logger.error(f"Failed to parse update JSON: {e}")
return JsonResponse({"error": "Invalid JSON payload"}, status=400) return JsonResponse({"status": "ok", "error": "Invalid JSON payload"}, status=200)
# 3) Log conversation & inbound message # 4) Log inbound message
user_id = str(update.effective_user.id) user_id = str(update.effective_user.id)
conv, _ = await sync_to_async(TelegramConversation.objects.get_or_create)( conv, _ = await sync_to_async(TelegramConversation.objects.get_or_create)(
bot=bot_instance, bot=bot_instance,
@ -52,47 +54,62 @@ async def telegram_webhook(request, bot_name):
defaults={'started_at': timezone.now()} defaults={'started_at': timezone.now()}
) )
incoming_text = update.message.text or "" incoming_text = update.message.text or ""
await sync_to_async(TelegramMessage.objects.create)( try:
conversation=conv, await sync_to_async(TelegramMessage.objects.create)(
direction=TelegramMessage.IN, conversation=conv,
content=incoming_text direction=TelegramMessage.IN,
) content=incoming_text
)
except Exception as log_in_err:
logger.error(f"Error logging inbound message: {log_in_err}")
# 4) Route commands or AI # 5) Handle commands or AI response
if update.message: if update.message:
# built-in commands try:
if update.message.text == "/start": text = update.message.text or ""
await start(update) if text == "/start":
elif update.message.text == "/help": await start(update)
await help_command(update) elif text == "/help":
elif update.message.text == "/dream_city": await help_command(update)
await dream_city_command(update) elif text == "/dream_city":
elif update.message.location: await dream_city_command(update)
await handle_location(update) elif update.message.location:
else: await handle_location(update)
# AI fallback else:
assistant_instance = await sync_to_async(LangchainAIService)( # AI fallback with resilience
bot_instance.assistant try:
) assistant = await sync_to_async(LangchainAIService)(bot_instance.assistant)
start_time = timezone.now() start_time = timezone.now()
bot_response = await sync_to_async( bot_response = await sync_to_async(assistant.generate_response)(text)
assistant_instance.generate_response response_time = int((timezone.now() - start_time).total_seconds() * 1000)
)(update.message.text) except Exception as ai_err:
response_time = int((timezone.now() - start_time).total_seconds() * 1000) logger.error(f"AI service error: {ai_err}")
bot_response = "Lo siento, el servicio de IA no está disponible."
response_time = 0
# Send reply # Send reply (skipped in DEBUG)
await update.message.reply_text(bot_response) if not settings.DEBUG:
try:
await update.message.reply_text(bot_response)
except Exception as send_err:
logger.error(f"Error sending message to Telegram: {send_err}")
# 5) Log outbound message # Log outbound message
await sync_to_async(TelegramMessage.objects.create)( try:
conversation=conv, await sync_to_async(TelegramMessage.objects.create)(
direction=TelegramMessage.OUT, conversation=conv,
content=bot_response, direction=TelegramMessage.OUT,
response_time_ms=response_time content=bot_response,
) response_time_ms=response_time
)
except Exception as log_out_err:
logger.error(f"Error logging outbound message: {log_out_err}")
except Exception as cmd_err:
logger.error(f"Error processing Telegram commands: {cmd_err}")
return JsonResponse({"status": "ok"}) # 6) Always return 200 OK
return JsonResponse({"status": "ok"}, status=200)
except Exception as e: except Exception as e:
logger.error(f"Error in telegram_webhook: {e}") logger.error(f"Unexpected error in telegram_webhook: {e}")
return JsonResponse({"error": f"Unexpected error: {e}"}, status=500) return JsonResponse({"status": "ok", "error": str(e)}, status=200)

View File

@ -9,10 +9,9 @@ from datetime import datetime
from langchain.prompts import ChatPromptTemplate from langchain.prompts import ChatPromptTemplate
from langchain_community.chains.graph_qa.cypher import GraphCypherQAChain from langchain_community.chains.graph_qa.cypher import GraphCypherQAChain
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
DEFAULT_PROFILE_NAME = "DefaultNeo4jProfile" # Define a default profile DEFAULT_PROFILE_NAME = "DefaultNeo4jProfile"
class Neo4jDatabase: class Neo4jDatabase:
""" """
@ -28,84 +27,99 @@ class Neo4jDatabase:
logger.warning("No profile specified. Using default Neo4j profile.") logger.warning("No profile specified. Using default Neo4j profile.")
self.profile = Neo4jProfile.objects.get(name=DEFAULT_PROFILE_NAME) self.profile = Neo4jProfile.objects.get(name=DEFAULT_PROFILE_NAME)
# Connect to the assigned Neo4j instance # Attempt to connect to the assigned Neo4j instance
self.graph = Neo4jGraph( try:
url=self.profile.uri, self.graph = Neo4jGraph(
username=self.profile.username, url=self.profile.uri,
password=self.profile.password username=self.profile.username,
) password=self.profile.password
)
except Exception as e:
logger.error(f"Failed to initialize Neo4jGraph: {e}")
self.graph = None
# Load the correct AI model # Initialize LLM if graph is available
self.llm = ChatOpenAI( try:
api_key=self.profile.openai_api_key, self.llm = ChatOpenAI(
model_name=self.profile.model_name, api_key=self.profile.openai_api_key,
temperature=0 model_name=self.profile.model_name,
) temperature=0
)
except Exception as e:
logger.error(f"Failed to initialize ChatOpenAI: {e}")
self.llm = None
# Add the custom prompt to enforce English # Prepare graph transformer only if llm available
english_prompt = ChatPromptTemplate.from_template( if self.llm:
"Transform the following text into a graph structure. All nodes, relationships, and properties should be in English, regardless of the original language." english_prompt = ChatPromptTemplate.from_template(
) "Transform the following text into a graph structure. All nodes, relationships, and properties should be in English, regardless of the original language."
)
# Apply the custom English prompt here try:
self.graph_transformer = LLMGraphTransformer( self.graph_transformer = LLMGraphTransformer(
llm=self.llm, llm=self.llm,
prompt=english_prompt prompt=english_prompt
) )
except Exception as e:
logger.error(f"Failed to initialize LLMGraphTransformer: {e}")
self.graph_transformer = None
else:
self.graph_transformer = None
logger.info(f"Neo4jDatabase initialized with profile: {self.profile.name}") logger.info(f"Neo4jDatabase initialized with profile: {self.profile.name}")
except Neo4jProfile.DoesNotExist: except Neo4jProfile.DoesNotExist:
logger.error(f"Neo4j profile '{profile_name}' not found.") logger.error(f"Neo4j profile '{profile_name}' not found.")
raise self.graph = None
self.llm = None
self.graph_transformer = None
except Exception as e: except Exception as e:
logger.error(f"Failed to initialize Neo4jDatabase: {str(e)}") logger.error(f"Unexpected error initializing Neo4jDatabase: {e}")
raise self.graph = None
self.llm = None
self.graph_transformer = None
def store_interaction(self, user_id, bot_id, user_message, bot_response, platform): def store_interaction(self, user_id, bot_id, user_message, bot_response, platform):
""" """
Stores a chatbot interaction as a structured graph in Neo4j. Stores a chatbot interaction as a structured graph in Neo4j.
Converts messages into `Document` objects to work with `LLMGraphTransformer`. Converts messages into `Document` objects to work with `LLMGraphTransformer`.
""" """
if not self.graph_transformer:
logger.warning("Graph transformer not available, skipping store_interaction.")
return
try: try:
timestamp = datetime.utcnow().isoformat() timestamp = datetime.utcnow().isoformat()
# Convert messages into `Document` objects
documents = [ documents = [
Document(page_content=user_message, metadata={"role": "user", "user_id": user_id, "platform": platform, "created_at": timestamp}), Document(page_content=user_message, metadata={"role": "user", "user_id": user_id, "platform": platform, "created_at": timestamp}),
Document(page_content=bot_response, metadata={"role": "bot", "bot_id": bot_id, "platform": platform, "created_at": timestamp}), Document(page_content=bot_response, metadata={"role": "bot", "bot_id": bot_id, "platform": platform, "created_at": timestamp}),
] ]
# Convert text into structured graph documents
graph_docs = self.graph_transformer.convert_to_graph_documents(documents) graph_docs = self.graph_transformer.convert_to_graph_documents(documents)
if self.graph:
# Store the structured graph data into Neo4j self.graph.add_graph_documents(graph_docs, include_source=True)
self.graph.add_graph_documents(graph_docs, include_source=True) logger.info(f"Stored interaction in Neo4j (Profile: {self.profile.name})")
else:
logger.info(f"Stored interaction in Neo4j (Profile: {self.profile.name})") logger.warning("No Neo4j graph available, skipping add_graph_documents.")
except Exception as e: except Exception as e:
logger.error(f"Failed to store interaction in Neo4j: {str(e)}") logger.error(f"Failed to store interaction in Neo4j: {e}")
raise
def query_graph(self, user_query): def query_graph(self, user_query):
""" """
Queries the graph using GraphCypherQAChain and returns a structured response. Queries the graph using GraphCypherQAChain and returns a structured response.
""" """
logger.info(f"Calling Neo4j {user_query}") if not self.graph or not self.llm:
logger.warning("Graph or LLM not available, cannot query.")
return None
try: try:
# Use AI model to generate Cypher query
qa_chain = GraphCypherQAChain.from_llm( qa_chain = GraphCypherQAChain.from_llm(
llm=self.llm, llm=self.llm,
graph=self.graph, graph=self.graph,
verbose=True, verbose=True,
allow_dangerous_requests=True allow_dangerous_requests=True
) )
result = qa_chain.invoke({"query": user_query}) result = qa_chain.invoke({"query": user_query})
logger.info(f"Resulting Neo4j {result}") logger.info(f"Resulting Neo4j query result: {result}")
return result['result'] return result.get('result')
except Exception as e: except Exception as e:
logger.error(f"Graph query failed: {e}") logger.error(f"Graph query failed: {e}")
return None return None

View File

@ -0,0 +1,21 @@
{
"update_id": 100000000,
"message": {
"message_id": 1,
"from": {
"id": 123456789,
"is_bot": false,
"first_name": "Tester",
"username": "tester_user"
},
"chat": {
"id": 123456789,
"first_name": "Tester",
"username": "tester_user",
"type": "private"
},
"date": 1700000000,
"text": "Hola desde local!"
}
}

59
tests/test_telegram_webhook.sh Executable file
View File

@ -0,0 +1,59 @@
#!/usr/bin/env bash
set -euo pipefail
# === Configuración ===
BOT_NAME="PolisplexityBot"
HOST="localhost"
PORT="8011" # Puerto donde corre tu servidor Django (devserver o Docker mapped)
WEBHOOK_URL="http://${HOST}:${PORT}/bots/webhook/${BOT_NAME}/"
PAYLOAD_FILE="$(dirname "$0")/fake_telegram_update.json"
# === Ejecución ===
echo "=== Test Telegram Webhook ==="
echo "Bot: $BOT_NAME"
echo "URL: $WEBHOOK_URL"
echo "Payload: $PAYLOAD_FILE"
echo
echo "1) Mostrando contenido del payload:"
cat "$PAYLOAD_FILE"
echo
echo "2) Enviando payload al webhook (verbose):"
# Archivo temporal para capturar body
BODY_FILE=$(mktemp)
# Enviamos con curl -v para depuración, guardamos respuesta en BODY_FILE y código en STATUS_CODE
STATUS_CODE=$(curl -v \
-X POST \
-H "Content-Type: application/json" \
--data @"$PAYLOAD_FILE" \
-o "$BODY_FILE" \
-w "%{http_code}" \
"$WEBHOOK_URL" || true)
echo
echo "→ HTTP Status: $STATUS_CODE"
echo "→ Response Body:"
cat "$BODY_FILE"
echo
if [[ "$STATUS_CODE" != "200" ]]; then
echo "❌ ERROR: el webhook devolvió código $STATUS_CODE"
exit 1
fi
echo "3) Verificando en base de datos dentro del contenedor Docker..."
# Usamos docker exec para acceder al entorno con dependencias instaladas
docker exec -i polisplexity_django python manage.py shell << 'EOF'
from pxy_bots.models import TelegramConversation, TelegramMessage
print("Conversations:", TelegramConversation.objects.count())
print("Messages: ", TelegramMessage.objects.count())
for m in TelegramMessage.objects.all():
print(f" • [{m.direction:3}] (conv={m.conversation_id}) rt={m.response_time_ms or 0} ms ⇒ {m.content[:50]!r}")
EOF
# Limpieza temporal
rm -f "$BODY_FILE"
echo "✅ Test completado"