166 lines
6.9 KiB
Python
166 lines
6.9 KiB
Python
# pxy_bots/renderer.py
|
|
import json
|
|
import logging
|
|
from typing import Dict, List, Optional
|
|
|
|
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Bot, Message
|
|
from telegram.error import TelegramError
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _build_keyboard(buttons: Optional[List[dict]]) -> Optional[InlineKeyboardMarkup]:
|
|
"""
|
|
Build an InlineKeyboardMarkup from a list of button specs.
|
|
|
|
Supported kinds:
|
|
- open_url: {"label":"...", "kind":"open_url", "url":"https://..."}
|
|
- callback_api: {"label":"...", "kind":"callback_api", "action":"rerun",
|
|
"params": {...}, "state_token":"..."}
|
|
"""
|
|
if not buttons:
|
|
return None
|
|
|
|
rows: List[List[InlineKeyboardButton]] = []
|
|
current_row: List[InlineKeyboardButton] = []
|
|
|
|
for b in buttons:
|
|
kind = (b.get("kind") or "").lower()
|
|
label = b.get("label") or "…"
|
|
|
|
if kind == "open_url":
|
|
url = b.get("url")
|
|
if not url:
|
|
logger.warning("renderer: open_url without url; skipping")
|
|
continue
|
|
btn = InlineKeyboardButton(text=label, url=url)
|
|
|
|
elif kind == "callback_api":
|
|
# Keep callback_data tiny; prefer server-issued state_token.
|
|
if b.get("state_token"):
|
|
data = {"t": b["state_token"]} # compact key
|
|
else:
|
|
data = {"a": b.get("action"), "p": b.get("params") or {}}
|
|
try:
|
|
payload = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
|
|
except Exception:
|
|
payload = '{"e":"bad"}'
|
|
# Telegram recommends <=64 bytes for callback_data
|
|
if len(payload.encode("utf-8")) > 64:
|
|
logger.warning("renderer: callback_data too long (%sB); trimming",
|
|
len(payload.encode("utf-8")))
|
|
payload = payload.encode("utf-8")[:64].decode("utf-8", errors="ignore")
|
|
btn = InlineKeyboardButton(text=label, callback_data=payload)
|
|
|
|
else:
|
|
logger.warning("renderer: unknown button kind=%s; skipping", kind)
|
|
continue
|
|
|
|
current_row.append(btn)
|
|
if len(current_row) >= 3:
|
|
rows.append(current_row)
|
|
current_row = []
|
|
|
|
if current_row:
|
|
rows.append(current_row)
|
|
|
|
return InlineKeyboardMarkup(rows) if rows else None
|
|
|
|
|
|
async def render_spec(*, bot: Bot, chat_id: int, spec: Dict) -> List[Message]:
|
|
"""
|
|
Send messages according to a render_spec:
|
|
{
|
|
"messages": [
|
|
{"type":"text","text":"..."},
|
|
{"type":"photo","media_url":"https://...","caption":"..."}
|
|
],
|
|
"buttons":[ ... ], # optional top-level buttons (attached to the LAST message)
|
|
"telemetry": {"run_id":"...", "cache_ttl_s":600},
|
|
"schema_version": "render.v1"
|
|
}
|
|
|
|
Returns the list of telegram.Message objects sent.
|
|
"""
|
|
msgs = spec.get("messages") or []
|
|
top_buttons = spec.get("buttons") or None
|
|
sent: List[Message] = []
|
|
|
|
for i, m in enumerate(msgs):
|
|
mtype = (m.get("type") or "text").lower()
|
|
kb = _build_keyboard(m.get("buttons") or (top_buttons if i == len(msgs) - 1 else None))
|
|
|
|
try:
|
|
if mtype == "text":
|
|
text = m.get("text") or ""
|
|
msg = await bot.send_message(chat_id=chat_id, text=text, reply_markup=kb)
|
|
sent.append(msg)
|
|
|
|
elif mtype == "photo":
|
|
file_id = m.get("file_id")
|
|
media_url = m.get("media_url")
|
|
caption = m.get("caption") or None
|
|
if not (file_id or media_url):
|
|
logger.warning("renderer: photo without file_id/media_url; skipping")
|
|
continue
|
|
try:
|
|
msg = await bot.send_photo(chat_id=chat_id,
|
|
photo=file_id or media_url,
|
|
caption=caption,
|
|
reply_markup=kb)
|
|
sent.append(msg)
|
|
except TelegramError as te:
|
|
# Typical: "BadRequest: Wrong type of the web page content"
|
|
logger.exception("renderer.photo_error send_photo err=%s url=%s", te, media_url)
|
|
# Fallback 1: try as document (Telegram is more permissive)
|
|
try:
|
|
msg = await bot.send_document(chat_id=chat_id,
|
|
document=file_id or media_url,
|
|
caption=caption,
|
|
reply_markup=kb)
|
|
sent.append(msg)
|
|
except TelegramError as te2:
|
|
logger.exception("renderer.photo_fallback_doc_error err=%s url=%s", te2, media_url)
|
|
# Fallback 2: plain text with link
|
|
fallback_text = (caption + "\n" if caption else "") + (media_url or "")
|
|
if fallback_text.strip():
|
|
try:
|
|
msg = await bot.send_message(chat_id=chat_id,
|
|
text=fallback_text,
|
|
reply_markup=kb)
|
|
sent.append(msg)
|
|
except TelegramError as te3:
|
|
logger.exception("renderer.photo_fallback_text_error err=%s url=%s", te3, media_url)
|
|
|
|
elif mtype == "document":
|
|
file_id = m.get("file_id")
|
|
media_url = m.get("media_url")
|
|
caption = m.get("caption") or None
|
|
if not (file_id or media_url):
|
|
logger.warning("renderer: document without file_id/media_url; skipping")
|
|
continue
|
|
msg = await bot.send_document(chat_id=chat_id, document=file_id or media_url,
|
|
caption=caption, reply_markup=kb)
|
|
sent.append(msg)
|
|
|
|
elif mtype == "video":
|
|
file_id = m.get("file_id")
|
|
media_url = m.get("media_url")
|
|
caption = m.get("caption") or None
|
|
if not (file_id or media_url):
|
|
logger.warning("renderer: video without file_id/media_url; skipping")
|
|
continue
|
|
msg = await bot.send_video(chat_id=chat_id, video=file_id or media_url,
|
|
caption=caption, reply_markup=kb)
|
|
sent.append(msg)
|
|
|
|
else:
|
|
logger.warning("renderer: unsupported message type=%s; skipping", mtype)
|
|
|
|
except TelegramError as te:
|
|
logger.exception("renderer.telegram_error type=%s err=%s", mtype, te)
|
|
except Exception as e:
|
|
logger.exception("renderer.unexpected type=%s err=%s", mtype, e)
|
|
|
|
return sent
|