From 2aff5888f55606bab5d75ec82b0e860ae7e84cfc Mon Sep 17 00:00:00 2001 From: Ekaropolus Date: Tue, 20 May 2025 03:27:15 -0600 Subject: [PATCH] Neo4j override on telegram bot messages --- pxy_bots/views.py | 125 ++++++++++++++++++-------------- pxy_neo4j/neo4j_connector.py | 104 ++++++++++++++------------ tests/fake_telegram_update.json | 21 ++++++ tests/test_telegram_webhook.sh | 59 +++++++++++++++ 4 files changed, 210 insertions(+), 99 deletions(-) create mode 100644 tests/fake_telegram_update.json create mode 100755 tests/test_telegram_webhook.sh diff --git a/pxy_bots/views.py b/pxy_bots/views.py index 0677a51..c0306a7 100644 --- a/pxy_bots/views.py +++ b/pxy_bots/views.py @@ -1,50 +1,52 @@ import json -from telegram import Update, Bot +import logging from django.http import JsonResponse from django.views.decorators.csrf import csrf_exempt from asgiref.sync import sync_to_async -from .models import TelegramBot +from telegram import Update, Bot +from django.utils import timezone +from django.conf import settings + +from .models import TelegramBot, TelegramConversation, TelegramMessage from pxy_langchain.services import LangchainAIService from .handlers import dream_city_command, start, help_command, handle_location -import logging -from .models import TelegramConversation, TelegramMessage, TelegramBot -from django.utils import timezone - logger = logging.getLogger(__name__) @csrf_exempt async def telegram_webhook(request, bot_name): """ - Webhook view that handles Telegram updates asynchronously and logs messages. + Telegram webhook handler that logs inbound/outbound messages and + responds with AI or commands, resilient to Neo4j or send failures. + Always returns 200 OK to prevent Telegram retries. """ try: logger.info(f"Webhook called for bot: {bot_name}") - # 1) Fetch the bot instance + # 1) Load bot configuration try: bot_instance = await sync_to_async(TelegramBot.objects.get)( name=bot_name, is_active=True ) - logger.info(f"Loaded bot configuration: {bot_instance}") except TelegramBot.DoesNotExist: logger.error(f"Bot '{bot_name}' not found or inactive.") - return JsonResponse({"error": f"Bot '{bot_name}' not found."}, status=400) + return JsonResponse({"status": "ok", "error": f"Bot '{bot_name}' not found."}, status=200) + # 2) Ensure POST if request.method != "POST": - logger.warning("Received non-POST request") - return JsonResponse({"error": "Invalid request method"}, status=405) + logger.warning("Received non-POST request to Telegram webhook.") + return JsonResponse({"status": "ok"}, status=200) - # 2) Parse the incoming update + # 3) Parse the update JSON try: payload = json.loads(request.body.decode("utf-8")) update = Update.de_json(payload, Bot(token=bot_instance.token)) - logger.info(f"Update received: {update}") + logger.info(f"Parsed update: {update}") except Exception as e: - logger.error(f"Failed to parse update: {e}") - return JsonResponse({"error": "Invalid JSON payload"}, status=400) + logger.error(f"Failed to parse update JSON: {e}") + return JsonResponse({"status": "ok", "error": "Invalid JSON payload"}, status=200) - # 3) Log conversation & inbound message + # 4) Log inbound message user_id = str(update.effective_user.id) conv, _ = await sync_to_async(TelegramConversation.objects.get_or_create)( bot=bot_instance, @@ -52,47 +54,62 @@ async def telegram_webhook(request, bot_name): defaults={'started_at': timezone.now()} ) incoming_text = update.message.text or "" - await sync_to_async(TelegramMessage.objects.create)( - conversation=conv, - direction=TelegramMessage.IN, - content=incoming_text - ) + try: + await sync_to_async(TelegramMessage.objects.create)( + conversation=conv, + direction=TelegramMessage.IN, + content=incoming_text + ) + except Exception as log_in_err: + logger.error(f"Error logging inbound message: {log_in_err}") - # 4) Route commands or AI + # 5) Handle commands or AI response if update.message: - # built-in commands - if update.message.text == "/start": - await start(update) - elif update.message.text == "/help": - await help_command(update) - elif update.message.text == "/dream_city": - await dream_city_command(update) - elif update.message.location: - await handle_location(update) - else: - # AI fallback - assistant_instance = await sync_to_async(LangchainAIService)( - bot_instance.assistant - ) - start_time = timezone.now() - bot_response = await sync_to_async( - assistant_instance.generate_response - )(update.message.text) - response_time = int((timezone.now() - start_time).total_seconds() * 1000) + try: + text = update.message.text or "" + if text == "/start": + await start(update) + elif text == "/help": + await help_command(update) + elif text == "/dream_city": + await dream_city_command(update) + elif update.message.location: + await handle_location(update) + else: + # AI fallback with resilience + try: + assistant = await sync_to_async(LangchainAIService)(bot_instance.assistant) + start_time = timezone.now() + bot_response = await sync_to_async(assistant.generate_response)(text) + response_time = int((timezone.now() - start_time).total_seconds() * 1000) + except Exception as ai_err: + logger.error(f"AI service error: {ai_err}") + bot_response = "Lo siento, el servicio de IA no está disponible." + response_time = 0 - # Send reply - await update.message.reply_text(bot_response) + # Send reply (skipped in DEBUG) + if not settings.DEBUG: + try: + await update.message.reply_text(bot_response) + except Exception as send_err: + logger.error(f"Error sending message to Telegram: {send_err}") - # 5) Log outbound message - await sync_to_async(TelegramMessage.objects.create)( - conversation=conv, - direction=TelegramMessage.OUT, - content=bot_response, - response_time_ms=response_time - ) + # Log outbound message + try: + await sync_to_async(TelegramMessage.objects.create)( + conversation=conv, + direction=TelegramMessage.OUT, + content=bot_response, + response_time_ms=response_time + ) + except Exception as log_out_err: + logger.error(f"Error logging outbound message: {log_out_err}") + except Exception as cmd_err: + logger.error(f"Error processing Telegram commands: {cmd_err}") - return JsonResponse({"status": "ok"}) + # 6) Always return 200 OK + return JsonResponse({"status": "ok"}, status=200) except Exception as e: - logger.error(f"Error in telegram_webhook: {e}") - return JsonResponse({"error": f"Unexpected error: {e}"}, status=500) + logger.error(f"Unexpected error in telegram_webhook: {e}") + return JsonResponse({"status": "ok", "error": str(e)}, status=200) diff --git a/pxy_neo4j/neo4j_connector.py b/pxy_neo4j/neo4j_connector.py index 7c4a530..4d2078f 100644 --- a/pxy_neo4j/neo4j_connector.py +++ b/pxy_neo4j/neo4j_connector.py @@ -9,10 +9,9 @@ from datetime import datetime from langchain.prompts import ChatPromptTemplate from langchain_community.chains.graph_qa.cypher import GraphCypherQAChain - logger = logging.getLogger(__name__) -DEFAULT_PROFILE_NAME = "DefaultNeo4jProfile" # Define a default profile +DEFAULT_PROFILE_NAME = "DefaultNeo4jProfile" class Neo4jDatabase: """ @@ -28,84 +27,99 @@ class Neo4jDatabase: logger.warning("No profile specified. Using default Neo4j profile.") self.profile = Neo4jProfile.objects.get(name=DEFAULT_PROFILE_NAME) - # Connect to the assigned Neo4j instance - self.graph = Neo4jGraph( - url=self.profile.uri, - username=self.profile.username, - password=self.profile.password - ) + # Attempt to connect to the assigned Neo4j instance + try: + self.graph = Neo4jGraph( + url=self.profile.uri, + username=self.profile.username, + password=self.profile.password + ) + except Exception as e: + logger.error(f"Failed to initialize Neo4jGraph: {e}") + self.graph = None - # Load the correct AI model - self.llm = ChatOpenAI( - api_key=self.profile.openai_api_key, - model_name=self.profile.model_name, - temperature=0 - ) + # Initialize LLM if graph is available + try: + self.llm = ChatOpenAI( + api_key=self.profile.openai_api_key, + model_name=self.profile.model_name, + temperature=0 + ) + except Exception as e: + logger.error(f"Failed to initialize ChatOpenAI: {e}") + self.llm = None - # Add the custom prompt to enforce English - english_prompt = ChatPromptTemplate.from_template( - "Transform the following text into a graph structure. All nodes, relationships, and properties should be in English, regardless of the original language." - ) - - # Apply the custom English prompt here - self.graph_transformer = LLMGraphTransformer( - llm=self.llm, - prompt=english_prompt - ) + # Prepare graph transformer only if llm available + if self.llm: + english_prompt = ChatPromptTemplate.from_template( + "Transform the following text into a graph structure. All nodes, relationships, and properties should be in English, regardless of the original language." + ) + try: + self.graph_transformer = LLMGraphTransformer( + llm=self.llm, + prompt=english_prompt + ) + except Exception as e: + logger.error(f"Failed to initialize LLMGraphTransformer: {e}") + self.graph_transformer = None + else: + self.graph_transformer = None logger.info(f"Neo4jDatabase initialized with profile: {self.profile.name}") except Neo4jProfile.DoesNotExist: logger.error(f"Neo4j profile '{profile_name}' not found.") - raise + self.graph = None + self.llm = None + self.graph_transformer = None except Exception as e: - logger.error(f"Failed to initialize Neo4jDatabase: {str(e)}") - raise + logger.error(f"Unexpected error initializing Neo4jDatabase: {e}") + self.graph = None + self.llm = None + self.graph_transformer = None def store_interaction(self, user_id, bot_id, user_message, bot_response, platform): """ Stores a chatbot interaction as a structured graph in Neo4j. Converts messages into `Document` objects to work with `LLMGraphTransformer`. """ + if not self.graph_transformer: + logger.warning("Graph transformer not available, skipping store_interaction.") + return + try: - timestamp = datetime.utcnow().isoformat() - - # Convert messages into `Document` objects documents = [ Document(page_content=user_message, metadata={"role": "user", "user_id": user_id, "platform": platform, "created_at": timestamp}), Document(page_content=bot_response, metadata={"role": "bot", "bot_id": bot_id, "platform": platform, "created_at": timestamp}), ] - - # Convert text into structured graph documents graph_docs = self.graph_transformer.convert_to_graph_documents(documents) - - # Store the structured graph data into Neo4j - self.graph.add_graph_documents(graph_docs, include_source=True) - - logger.info(f"Stored interaction in Neo4j (Profile: {self.profile.name})") + if self.graph: + self.graph.add_graph_documents(graph_docs, include_source=True) + logger.info(f"Stored interaction in Neo4j (Profile: {self.profile.name})") + else: + logger.warning("No Neo4j graph available, skipping add_graph_documents.") except Exception as e: - logger.error(f"Failed to store interaction in Neo4j: {str(e)}") - raise + logger.error(f"Failed to store interaction in Neo4j: {e}") def query_graph(self, user_query): """ Queries the graph using GraphCypherQAChain and returns a structured response. """ - logger.info(f"Calling Neo4j {user_query}") + if not self.graph or not self.llm: + logger.warning("Graph or LLM not available, cannot query.") + return None + try: - # Use AI model to generate Cypher query qa_chain = GraphCypherQAChain.from_llm( llm=self.llm, graph=self.graph, verbose=True, allow_dangerous_requests=True ) - result = qa_chain.invoke({"query": user_query}) - logger.info(f"Resulting Neo4j {result}") - return result['result'] - + logger.info(f"Resulting Neo4j query result: {result}") + return result.get('result') except Exception as e: logger.error(f"Graph query failed: {e}") return None diff --git a/tests/fake_telegram_update.json b/tests/fake_telegram_update.json new file mode 100644 index 0000000..c14506c --- /dev/null +++ b/tests/fake_telegram_update.json @@ -0,0 +1,21 @@ +{ + "update_id": 100000000, + "message": { + "message_id": 1, + "from": { + "id": 123456789, + "is_bot": false, + "first_name": "Tester", + "username": "tester_user" + }, + "chat": { + "id": 123456789, + "first_name": "Tester", + "username": "tester_user", + "type": "private" + }, + "date": 1700000000, + "text": "Hola desde local!" + } +} + diff --git a/tests/test_telegram_webhook.sh b/tests/test_telegram_webhook.sh new file mode 100755 index 0000000..82b1f7b --- /dev/null +++ b/tests/test_telegram_webhook.sh @@ -0,0 +1,59 @@ +#!/usr/bin/env bash +set -euo pipefail + +# === Configuración === +BOT_NAME="PolisplexityBot" +HOST="localhost" +PORT="8011" # Puerto donde corre tu servidor Django (devserver o Docker mapped) +WEBHOOK_URL="http://${HOST}:${PORT}/bots/webhook/${BOT_NAME}/" +PAYLOAD_FILE="$(dirname "$0")/fake_telegram_update.json" + +# === Ejecución === +echo "=== Test Telegram Webhook ===" +echo "Bot: $BOT_NAME" +echo "URL: $WEBHOOK_URL" +echo "Payload: $PAYLOAD_FILE" +echo + +echo "1) Mostrando contenido del payload:" +cat "$PAYLOAD_FILE" +echo + +echo "2) Enviando payload al webhook (verbose):" +# Archivo temporal para capturar body +BODY_FILE=$(mktemp) +# Enviamos con curl -v para depuración, guardamos respuesta en BODY_FILE y código en STATUS_CODE +STATUS_CODE=$(curl -v \ + -X POST \ + -H "Content-Type: application/json" \ + --data @"$PAYLOAD_FILE" \ + -o "$BODY_FILE" \ + -w "%{http_code}" \ + "$WEBHOOK_URL" || true) + +echo + echo "→ HTTP Status: $STATUS_CODE" + echo "→ Response Body:" + cat "$BODY_FILE" + echo + +if [[ "$STATUS_CODE" != "200" ]]; then + echo "❌ ERROR: el webhook devolvió código $STATUS_CODE" + exit 1 +fi + +echo "3) Verificando en base de datos dentro del contenedor Docker..." +# Usamos docker exec para acceder al entorno con dependencias instaladas +docker exec -i polisplexity_django python manage.py shell << 'EOF' +from pxy_bots.models import TelegramConversation, TelegramMessage + +print("Conversations:", TelegramConversation.objects.count()) +print("Messages: ", TelegramMessage.objects.count()) +for m in TelegramMessage.objects.all(): + print(f" • [{m.direction:3}] (conv={m.conversation_id}) rt={m.response_time_ms or 0} ms ⇒ {m.content[:50]!r}") +EOF + +# Limpieza temporal +rm -f "$BODY_FILE" + +echo "✅ Test completado"