132 lines
5.0 KiB
Python
132 lines
5.0 KiB
Python
# pxy_bots/renderer.py
|
||
import json
|
||
import logging
|
||
from typing import Dict, List, Optional
|
||
|
||
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Bot, Message
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
def _build_keyboard(buttons: Optional[List[dict]]) -> Optional[InlineKeyboardMarkup]:
|
||
"""
|
||
Build an InlineKeyboardMarkup from a list of button specs.
|
||
|
||
Supported kinds:
|
||
- open_url: {"label": "...", "kind": "open_url", "url": "https://..."}
|
||
- callback_api: {"label": "...", "kind": "callback_api", "action": "rerun", "params": {...}, "state_token": "..."}
|
||
"""
|
||
if not buttons:
|
||
return None
|
||
|
||
rows: List[List[InlineKeyboardButton]] = []
|
||
current_row: List[InlineKeyboardButton] = []
|
||
|
||
for b in buttons:
|
||
kind = (b.get("kind") or "").lower()
|
||
label = b.get("label") or "…"
|
||
|
||
if kind == "open_url":
|
||
url = b.get("url")
|
||
if not url:
|
||
logger.warning("renderer: open_url without url; skipping button")
|
||
continue
|
||
btn = InlineKeyboardButton(text=label, url=url)
|
||
elif kind == "callback_api":
|
||
# Keep callback_data small. Prefer server-provided state_token when available.
|
||
data = {}
|
||
if b.get("state_token"):
|
||
data = {"t": b["state_token"]} # compact key to minimize size
|
||
else:
|
||
# fallback: tiny payload (risk hitting 64B limit if it grows)
|
||
data = {"a": b.get("action"), "p": b.get("params") or {}}
|
||
try:
|
||
payload = json.dumps(data, separators=(",", ":"), ensure_ascii=False)
|
||
except Exception:
|
||
payload = '{"e":"bad"}'
|
||
# Telegram limit: 1–64 bytes; try to keep it tiny
|
||
if len(payload.encode("utf-8")) > 64:
|
||
logger.warning("renderer: callback_data too long (%s bytes); trimming", len(payload.encode("utf-8")))
|
||
payload = payload.encode("utf-8")[:64].decode("utf-8", errors="ignore")
|
||
btn = InlineKeyboardButton(text=label, callback_data=payload)
|
||
else:
|
||
logger.warning("renderer: unknown button kind=%s; skipping", kind)
|
||
continue
|
||
|
||
current_row.append(btn)
|
||
if len(current_row) >= 3:
|
||
rows.append(current_row)
|
||
current_row = []
|
||
|
||
if current_row:
|
||
rows.append(current_row)
|
||
|
||
return InlineKeyboardMarkup(rows) if rows else None
|
||
|
||
|
||
async def render_spec(*, bot: Bot, chat_id: int, spec: Dict) -> List[Message]:
|
||
"""
|
||
Send messages according to a render_spec:
|
||
{
|
||
"messages": [
|
||
{"type":"text","text":"..."},
|
||
{"type":"photo","media_url":"https://...","caption":"..."}
|
||
],
|
||
"buttons":[ ... ], # optional top-level buttons (attached to the LAST message)
|
||
"telemetry": {"run_id":"...", "cache_ttl_s": 600},
|
||
"schema_version": "render.v1"
|
||
}
|
||
|
||
Returns the list of telegram.Message objects sent.
|
||
"""
|
||
msgs = spec.get("messages") or []
|
||
top_buttons = spec.get("buttons") or None
|
||
sent: List[Message] = []
|
||
|
||
for i, m in enumerate(msgs):
|
||
mtype = (m.get("type") or "text").lower()
|
||
|
||
# Per-message buttons override top-level
|
||
kb = _build_keyboard(m.get("buttons") or (top_buttons if i == len(msgs) - 1 else None))
|
||
|
||
if mtype == "text":
|
||
text = m.get("text") or ""
|
||
msg = await bot.send_message(chat_id=chat_id, text=text, reply_markup=kb)
|
||
sent.append(msg)
|
||
|
||
elif mtype == "photo":
|
||
# Prefer file_id if provided, else media_url (Telegram can fetch by URL)
|
||
file_id = m.get("file_id")
|
||
media_url = m.get("media_url")
|
||
caption = m.get("caption") or None
|
||
if not (file_id or media_url):
|
||
logger.warning("renderer: photo without file_id/media_url; skipping")
|
||
continue
|
||
msg = await bot.send_photo(chat_id=chat_id, photo=file_id or media_url, caption=caption, reply_markup=kb)
|
||
sent.append(msg)
|
||
|
||
elif mtype == "document":
|
||
file_id = m.get("file_id")
|
||
media_url = m.get("media_url")
|
||
caption = m.get("caption") or None
|
||
if not (file_id or media_url):
|
||
logger.warning("renderer: document without file_id/media_url; skipping")
|
||
continue
|
||
msg = await bot.send_document(chat_id=chat_id, document=file_id or media_url, caption=caption, reply_markup=kb)
|
||
sent.append(msg)
|
||
|
||
elif mtype == "video":
|
||
file_id = m.get("file_id")
|
||
media_url = m.get("media_url")
|
||
caption = m.get("caption") or None
|
||
if not (file_id or media_url):
|
||
logger.warning("renderer: video without file_id/media_url; skipping")
|
||
continue
|
||
msg = await bot.send_video(chat_id=chat_id, video=file_id or media_url, caption=caption, reply_markup=kb)
|
||
sent.append(msg)
|
||
|
||
else:
|
||
logger.warning("renderer: unsupported message type=%s; skipping", mtype)
|
||
continue
|
||
|
||
return sent
|