Ekaropolus d52dfe75a2
All checks were successful
continuous-integration/drone/push Build is passing
Again refactor
2025-07-22 21:24:12 -06:00

293 lines
12 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import requests
from pxy_openai.assistants import OpenAIAssistant as OpenAIService # Import the assistant service
import logging
from .models import FacebookPageAssistant
from django.core.exceptions import ObjectDoesNotExist
from pxy_neo4j.neo4j_connector import Neo4jDatabase
logger = logging.getLogger(__name__)
class FacebookService:
"""
A service to interact with the Facebook Graph API.
"""
def __init__(self, user_access_token, facebook_api_version="v22.0"):
self.user_access_token = user_access_token
self.facebook_api_version = facebook_api_version
self.base_url = f"https://graph.facebook.com/{self.facebook_api_version}"
self.neo4j_db = Neo4jDatabase() # Initialize Neo4j connection
def reply_to_comment(self, page_id, comment_id, message):
"""
Replies to a specific comment using the Facebook API and OpenAI Assistant.
"""
# Retrieve the Page Access Token dynamically
page_access_token = self._get_page_access_token(page_id)
if not page_access_token:
logger.error(f"Unable to retrieve access token for page ID: {page_id}")
return None
# Fetch the appropriate OpenAI assistant for the page
try:
page_assistant = FacebookPageAssistant.objects.get(page_id=page_id)
openai_assistant_model = page_assistant.assistant
logger.info(f"Using assistant '{openai_assistant_model.name}' for page '{page_assistant.page_name}'")
except ObjectDoesNotExist:
logger.error(f"No assistant configured for page ID: {page_id}")
return None
# Use a default message if the received message is empty
if not message or message.strip() == "":
message = "Thank you for sharing this comment! What do you think about it?"
openai_service = OpenAIService(name=openai_assistant_model.name) # Pass the model's name to the service
bot_response = openai_service.handle_message(message)
# Send the response to Facebook
url = f"{self.base_url}/{comment_id}/comments"
payload = {"message": bot_response, "access_token": page_access_token}
try:
response = requests.post(url, data=payload)
response.raise_for_status()
logger.info(f"Replied to comment ID: {comment_id}")
# Store the interaction in Neo4j
self.neo4j_db.store_interaction(
user_id=f"fb_user_{comment_id}",
bot_id=f"fb_bot_{page_id}",
user_message=message,
bot_response=bot_response,
platform="Facebook"
)
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to reply to comment ID: {comment_id}. Error: {e}")
return None
def post_comment_on_share(self, page_id, post_id, message, sender_id=None):
"""
Posts a comment on a shared post and then on the original post.
"""
# 1) Fetch token
page_access_token = self._get_page_access_token(page_id)
if not page_access_token:
logger.error(f"Unable to retrieve access token for page ID: {page_id}")
return None
# 2) Fetch details
post_details = self._get_post_details(post_id, page_access_token)
if not post_details:
logger.error(f"Failed to retrieve post details for post ID: {post_id}")
return None
description = post_details.get("parent_message")
parent_id = post_details.get("parent_id")
author_page_id = post_details.get("parent_from_id")
# 3) Load assistant
try:
page_assistant = FacebookPageAssistant.objects.get(page_id=page_id)
openai_assistant_model = page_assistant.assistant
except ObjectDoesNotExist:
logger.error(f"No assistant configured for page ID: {page_id}")
return None
# 4) Build prompt & get AI response
prompt = self._build_prompt(message, description)
openai_svc = OpenAIService(name=openai_assistant_model.name)
bot_response = openai_svc.handle_message(prompt)
# 5) Post on shared post
shared = self._post_facebook_comment(
post_id,
bot_response,
page_access_token,
sender_id=author_page_id
)
if shared:
self._store_interaction(
page_id,
user_message=description or "Shared post comment",
bot_response=bot_response,
platform="Facebook"
)
# 6) Post on original post
if parent_id and shared:
orig = self._post_facebook_comment(parent_id, bot_response, page_access_token)
if orig:
self._store_interaction(
page_id,
user_message=description or "Original post comment",
bot_response=bot_response,
platform="Facebook (Original Post)"
)
return shared
# ─── Helper: prompt builder ────────────────────────────────────────────
def _build_prompt(self, message: str, description: str) -> str:
"""
Generate the same prompt logic you had inline.
"""
if not message or message.strip() == "":
if description:
return (
f"Dr. Dr. Ekaropolus previously said: '{description}'. "
"Based on this, write an insightful response in the most appropriate language that engages people in scientific discussion."
)
else:
return "Say something truly inspiring about science, a fact or idea that will amaze people."
else:
if description:
return (
f"Dr. Dr. Ekaropolus previously said: '{message}', "
f"and the shared post describes: '{description}'. "
"Combine these thoughts into an engaging, fun, and insightful response in the most appropriate language."
)
else:
return f"Dr. Dr. Ekaropolus said: '{message}'. Expand on this with an insightful scientific thought."
def _store_interaction(
self,
page_id: str,
user_message: str,
bot_response: str,
platform: str
) -> None:
"""
Encapsulates the repeated Neo4j store_interaction calls.
"""
try:
self.neo4j_db.store_interaction(
user_id = f"fb_bot_{page_id}",
bot_id = f"fb_bot_{page_id}",
user_message = user_message,
bot_response = bot_response,
platform = platform
)
except Exception as e:
logger.error(f"Error logging interaction for page {page_id}: {e}")
def get_system_user_id(self):
"""
Retrieves the system user ID using the user access token.
"""
try:
return '122106889202727657'
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching system user ID: {e}")
return None
def _get_page_access_token(self, page_id):
"""
Retrieves the Page Access Token for a specific Page ID.
"""
url = f"{self.base_url}/122106889202727657/accounts?access_token={self.user_access_token}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
if "data" in data:
for page in data["data"]:
if page.get("id") == str(page_id):
page_name = page.get("name", "Unknown")
access_token = page.get("access_token", "No Token")
logger.info(f"Retrieved access token for page {page_id}: {page_name}")
return access_token
logger.error(f"Error: Page ID {page_id} not found.")
else:
logger.error("Error: Unexpected response format from Facebook API.")
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching Page Access Token: {e}")
return None
def _get_post_details(self, post_id, access_token):
"""
Retrieves details of a post, including:
- message (the posts own text)
- description (from attachments)
- parent_id (if its a share)
- from_id (author of THIS post)
- parent_from_id (author of the ORIGINAL post, if shared)
- parent_message (text of the ORIGINAL post, if shared)
"""
try:
# 1st call: get this posts text, description, parent_id, and author
fields = (
"message,"
"attachments.limit(10){description,media,media_type,target,url},"
"parent_id,from"
)
url = f"{self.base_url}/{post_id}?fields={fields}&access_token={access_token}"
resp = requests.get(url)
resp.raise_for_status()
data = resp.json()
message = data.get("message", "")
attachments = data.get("attachments", {}).get("data", [{}])
description = attachments[0].get("description", "") if attachments else ""
parent_id = data.get("parent_id")
from_id = data.get("from", {}).get("id")
# Defaults if there is no parent
parent_from_id = None
parent_message = None
# 2nd call: if this is a share, fetch the ORIGINAL posts author and text
if parent_id:
parent_fields = "from,message"
parent_url = f"{self.base_url}/{parent_id}?fields={parent_fields}&access_token={access_token}"
p_resp = requests.get(parent_url)
p_resp.raise_for_status()
p_data = p_resp.json()
parent_from_id = p_data.get("from", {}).get("id")
parent_message = p_data.get("message", "")
return {
"message": message,
"description": description,
"parent_id": parent_id,
"from_id": from_id,
"parent_from_id": parent_from_id,
"parent_message": parent_message,
}
except requests.exceptions.RequestException as e:
logger.error(f"Failed to fetch post details for {post_id}: {e}")
return {}
def _post_facebook_comment(self, post_id, message, access_token, sender_id=None):
"""
Helper function to post a comment to a specific post.
"""
# Build the “mention” link (your Page)
prefix = f"https://www.facebook.com/{sender_id}" if sender_id else ""
# WhatsApp CTA link
whatsapp_link = "https://wa.me/447887147696"
# Assemble the final message with labels and spacing
formatted = "\n\n".join(filter(None, [
prefix,
message.strip(),
f"📱 Chat with us on WhatsApp: {whatsapp_link}"
]))
url = f"{self.base_url}/{post_id}/comments"
payload = {"message": formatted, "access_token": access_token}
try:
response = requests.post(url, data=payload)
response.raise_for_status()
logger.info(f"Posted a comment on post ID: {post_id}")
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Failed to comment on post ID: {post_id}. Error: {e}")
return None