Ekaropolus 8f10aebfa2
All checks were successful
continuous-integration/drone/push Build is passing
Telegram Bot configurable with handlers
2025-09-16 22:23:05 -06:00

184 lines
7.0 KiB
Python

import json
import requests
from django.db import models
from django.core.validators import MinValueValidator
from django.utils import timezone
from pxy_langchain.models import AIAssistant # LangChain assistant
# Telegram bot + simple conversation log
class TelegramBot(models.Model):
name = models.CharField(max_length=50, unique=True, help_text="Bot name (e.g., 'SupportBot').")
username = models.CharField(max_length=50, unique=True, help_text="Bot username (e.g., 'SupportBot').")
token = models.CharField(max_length=200, unique=True, help_text="Telegram bot token.")
is_active = models.BooleanField(default=True, help_text="If off, webhook can be refused.")
assistant = models.ForeignKey(
AIAssistant,
on_delete=models.CASCADE,
related_name="telegram_bots",
help_text="LangChain AI assistant associated with this bot.",
)
created_at = models.DateTimeField(default=timezone.now, editable=False)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ["name"]
def __str__(self):
return f"{self.name} (@{self.username})"
@staticmethod
def get_bot_token(bot_name: str) -> str:
try:
bot = TelegramBot.objects.get(name=bot_name, is_active=True)
return bot.token
except TelegramBot.DoesNotExist:
raise ValueError(f"Bot with name '{bot_name}' not found or inactive.")
def set_webhook(self, base_url: str) -> dict:
if not self.is_active:
raise ValueError(f"Bot '{self.name}' is inactive. Activate it before setting the webhook.")
webhook_url = f"{base_url.rstrip('/')}/bots/webhook/{self.name}/"
resp = requests.post(
f"https://api.telegram.org/bot{self.token}/setWebhook",
data={"url": webhook_url},
timeout=5,
)
resp.raise_for_status()
return resp.json()
class TelegramConversation(models.Model):
bot = models.ForeignKey(TelegramBot, on_delete=models.CASCADE, related_name='conversations')
user_id = models.CharField(max_length=64)
started_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ["-started_at"]
def __str__(self):
return f"{self.user_id} @ {self.started_at:%Y-%m-%d %H:%M}"
class TelegramMessage(models.Model):
IN = 'in'
OUT = 'out'
DIRECTION_CHOICES = [(IN, 'In'), (OUT, 'Out')]
conversation = models.ForeignKey(TelegramConversation, on_delete=models.CASCADE, related_name='messages')
direction = models.CharField(max_length=4, choices=DIRECTION_CHOICES)
content = models.TextField()
timestamp = models.DateTimeField(auto_now_add=True)
response_time_ms = models.IntegerField(null=True, blank=True)
class Meta:
ordering = ["timestamp", "id"]
def __str__(self):
return f"[{self.direction}] {self.content[:30]}"
# Configurable routing (Admin-driven)
class Connection(models.Model):
AUTH_NONE = "none"
AUTH_BEARER = "bearer"
AUTH_API_KEY = "api_key"
AUTH_BASIC = "basic"
AUTH_CHOICES = [
(AUTH_NONE, "None"),
(AUTH_BEARER, "Bearer token"),
(AUTH_API_KEY, "API key (in header)"),
(AUTH_BASIC, "Basic user:pass"),
]
name = models.CharField(max_length=120, unique=True)
base_url = models.CharField(max_length=500, help_text="e.g. https://api.example.com")
path_default = models.CharField(max_length=300, blank=True, default="", help_text="Optional default path, e.g. /bots/route")
auth_type = models.CharField(max_length=20, choices=AUTH_CHOICES, default=AUTH_NONE)
auth_value = models.CharField(max_length=500, blank=True, default="", help_text="token | key | user:pass")
headers_json = models.TextField(blank=True, default="", help_text='Extra headers as JSON object')
timeout_s = models.PositiveIntegerField(default=4, validators=[MinValueValidator(1)])
allowed_hosts = models.CharField(
max_length=800,
blank=True,
default="127.0.0.1,localhost,app.polisplexity.tech",
help_text="Comma-separated host allowlist for safety."
)
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(default=timezone.now, editable=False)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ["name"]
def __str__(self):
return self.name
def allowed_host_set(self):
return {h.strip().lower() for h in (self.allowed_hosts or "").split(",") if h.strip()}
def extra_headers(self):
if not self.headers_json:
return {}
try:
obj = json.loads(self.headers_json)
return obj if isinstance(obj, dict) else {}
except Exception:
return {}
def auth_headers(self):
h = {}
if self.auth_type == self.AUTH_BEARER and self.auth_value:
h["Authorization"] = f"Bearer {self.auth_value}"
elif self.auth_type == self.AUTH_API_KEY and self.auth_value:
h["X-API-Key"] = self.auth_value # convention; adjust if needed
elif self.auth_type == self.AUTH_BASIC and self.auth_value:
h["Authorization"] = f"Basic {self.auth_value}" # store user:pass
return h
class CommandRoute(models.Model):
TRIG_MESSAGE = "message"
TRIG_TEXTCMD = "text_command"
TRIG_CALLBACK = "callback"
TRIGGER_CHOICES = [
(TRIG_MESSAGE, "Message (no command)"),
(TRIG_TEXTCMD, "Text command (/cmd)"),
(TRIG_CALLBACK, "Callback"),
]
bot = models.ForeignKey("pxy_bots.TelegramBot", on_delete=models.CASCADE, related_name="routes")
trigger = models.CharField(max_length=20, choices=TRIGGER_CHOICES, default=TRIG_MESSAGE)
command_name = models.CharField(
max_length=80, blank=True, null=True,
help_text="Without leading '/'. Leave blank for default of that trigger."
)
connection = models.ForeignKey(Connection, on_delete=models.PROTECT, related_name="routes")
path = models.CharField(max_length=300, blank=True, default="", help_text="Overrides connection.path_default if set")
enabled = models.BooleanField(default=True)
priority = models.PositiveIntegerField(default=100, help_text="Lower runs first")
note = models.CharField(max_length=240, blank=True, default="")
created_at = models.DateTimeField(default=timezone.now, editable=False)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
ordering = ["priority", "id"]
indexes = [
models.Index(fields=["bot", "trigger", "command_name", "enabled", "priority"]),
]
def __str__(self):
cmd = self.command_name or "(default)"
return f"{self.bot.name} · {self.trigger} · {cmd}{self.connection.name}"
def clean(self):
if self.command_name:
self.command_name = self.command_name.strip().lstrip("/").lower()