345 lines
12 KiB
Python
345 lines
12 KiB
Python
import json
|
|
import os
|
|
from datetime import timedelta
|
|
from typing import Any, Dict, Optional
|
|
from urllib.parse import urlencode
|
|
|
|
from django.http import JsonResponse, HttpRequest
|
|
from django.utils import timezone
|
|
from django.views.decorators.csrf import csrf_exempt
|
|
from django.views.decorators.http import require_GET, require_http_methods
|
|
|
|
from pxy_stewards.models import Stewardship, StewardBlessing, StewardSignal
|
|
from pxy_stewards.utils import encode_geohash
|
|
from core.urlbuild import public_url
|
|
|
|
|
|
STEWARD_TTL_HOURS = int(os.getenv("STEWARD_TTL_HOURS", "168"))
|
|
STEWARD_SESSION_TTL_MIN = int(os.getenv("STEWARD_SESSION_TTL_MIN", "30"))
|
|
STEWARD_GEOHASH_PRECISION = int(os.getenv("STEWARD_GEOHASH_PRECISION", "7"))
|
|
STEWARD_ENERGY_MAX = int(os.getenv("STEWARD_ENERGY_MAX", "100"))
|
|
STEWARD_BLESS_COST = int(os.getenv("STEWARD_BLESS_COST", "25"))
|
|
STEWARD_BLESS_MINUTES = int(os.getenv("STEWARD_BLESS_MINUTES", "60"))
|
|
STEWARD_BLESS_MULTIPLIER = float(os.getenv("STEWARD_BLESS_MULTIPLIER", "2.0"))
|
|
|
|
TARGET_BOTS = {
|
|
"coins": {"label": "Citizens (Pepe Basurita)", "bot": "PepeBasuritaCoinsBot"},
|
|
"motito": {"label": "Motito (On-demand)", "bot": "PepeMotitoBot"},
|
|
"camion": {"label": "Camioncito (Route)", "bot": "PepeCamioncitoBot"},
|
|
}
|
|
|
|
|
|
def _render_text(text: str, buttons: Optional[list] = None) -> JsonResponse:
|
|
spec = {
|
|
"schema_version": "render.v1",
|
|
"messages": [{"type": "text", "text": text}],
|
|
}
|
|
if buttons:
|
|
spec["buttons"] = buttons
|
|
return JsonResponse(spec)
|
|
|
|
|
|
def _load_body(request: HttpRequest) -> Dict[str, Any]:
|
|
try:
|
|
raw = (request.body or b"").decode("utf-8")
|
|
return json.loads(raw or "{}")
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _extract_args(body: Dict[str, Any]) -> str:
|
|
args_raw = (body.get("input") or {}).get("args_raw") or ""
|
|
if args_raw.startswith("/"):
|
|
parts = args_raw.split(" ", 1)
|
|
return parts[1].strip() if len(parts) > 1 else ""
|
|
return args_raw.strip()
|
|
|
|
|
|
def _parse_latlon(raw: str) -> Optional[tuple[float, float]]:
|
|
parts = [p.strip() for p in (raw or "").split(",")]
|
|
if len(parts) != 2:
|
|
return None
|
|
try:
|
|
lat = float(parts[0])
|
|
lon = float(parts[1])
|
|
except Exception:
|
|
return None
|
|
return lat, lon
|
|
|
|
|
|
def _platform(body: Dict[str, Any]) -> str:
|
|
ctx = body.get("context") or {}
|
|
return (ctx.get("platform") or "telegram").strip().lower()
|
|
|
|
|
|
def _user_id(body: Dict[str, Any]) -> Optional[str]:
|
|
user = body.get("user") or {}
|
|
if user.get("id") is None:
|
|
return None
|
|
return str(user.get("id"))
|
|
|
|
|
|
def _bot_name(body: Dict[str, Any]) -> str:
|
|
bot = body.get("bot") or {}
|
|
return (bot.get("username") or "unknown").strip()
|
|
|
|
|
|
def _get_steward(platform: str, user_id: str) -> Stewardship:
|
|
steward, _ = Stewardship.objects.get_or_create(
|
|
platform=platform,
|
|
user_id=user_id,
|
|
defaults={"bot_name": "unknown", "energy": 0},
|
|
)
|
|
return steward
|
|
|
|
|
|
def _expire_if_needed(steward: Stewardship) -> None:
|
|
if steward.status == Stewardship.STATUS_ACTIVE and steward.expires_at:
|
|
if steward.expires_at <= timezone.now():
|
|
steward.status = Stewardship.STATUS_EXPIRED
|
|
steward.energy = 0
|
|
steward.save(update_fields=["status", "energy", "updated_at"])
|
|
|
|
|
|
def _cell_taken(geohash: str, exclude_user_id: Optional[str] = None) -> bool:
|
|
now = timezone.now()
|
|
qs = Stewardship.objects.filter(
|
|
geohash=geohash,
|
|
status=Stewardship.STATUS_ACTIVE,
|
|
expires_at__gt=now,
|
|
)
|
|
if exclude_user_id:
|
|
qs = qs.exclude(user_id=exclude_user_id)
|
|
return qs.exists()
|
|
|
|
|
|
def _activation_buttons() -> list:
|
|
buttons = []
|
|
for key, meta in TARGET_BOTS.items():
|
|
buttons.append({
|
|
"label": meta["label"],
|
|
"kind": "callback_api",
|
|
"action": "choose",
|
|
"params": {"b": key},
|
|
})
|
|
return buttons
|
|
|
|
|
|
def _simulation_url(request: HttpRequest, steward: Stewardship) -> str:
|
|
lat = steward.lat if steward.lat is not None else 19.4326
|
|
lon = steward.lon if steward.lon is not None else -99.1332
|
|
params = {
|
|
"lat": f"{lat:.6f}",
|
|
"lon": f"{lon:.6f}",
|
|
"dist": "700",
|
|
"auto": "1",
|
|
}
|
|
return f"{public_url('/simulations/mvp/', request)}?{urlencode(params)}"
|
|
|
|
|
|
@csrf_exempt
|
|
@require_http_methods(["POST"])
|
|
def steward_handle(request: HttpRequest):
|
|
body = _load_body(request)
|
|
platform = _platform(body)
|
|
user_id = _user_id(body)
|
|
bot_name = _bot_name(body)
|
|
if not user_id:
|
|
return _render_text("Missing user context. Try again from the bot.")
|
|
|
|
cmd = ((body.get("command") or {}).get("name") or "").strip().lstrip("/").lower()
|
|
trigger = ((body.get("command") or {}).get("trigger") or "").strip().lower()
|
|
args = _extract_args(body)
|
|
location = (body.get("input") or {}).get("location") or {}
|
|
|
|
steward = _get_steward(platform, user_id)
|
|
_expire_if_needed(steward)
|
|
if steward.bot_name != bot_name:
|
|
steward.bot_name = bot_name
|
|
steward.save(update_fields=["bot_name", "updated_at"])
|
|
|
|
if trigger == "callback":
|
|
data_raw = ((body.get("callback") or {}).get("data") or "")
|
|
try:
|
|
data = json.loads(data_raw)
|
|
except Exception:
|
|
data = {}
|
|
action = (data.get("a") or data.get("action") or "").lower()
|
|
params = data.get("p") or data.get("params") or {}
|
|
if action == "choose":
|
|
key = (params.get("b") or params.get("bot") or "").lower()
|
|
meta = TARGET_BOTS.get(key)
|
|
if not meta:
|
|
return _render_text("Unknown target bot. Try /steward again.")
|
|
if steward.status != Stewardship.STATUS_AWAITING_TARGET:
|
|
return _render_text("No pending activation. Use /steward to start.")
|
|
if steward.geohash and _cell_taken(steward.geohash, exclude_user_id=steward.user_id):
|
|
return _render_text("That cell already has a Steward. Choose a new location.")
|
|
steward.target_bot = meta["bot"]
|
|
steward.status = Stewardship.STATUS_ACTIVE
|
|
steward.expires_at = timezone.now() + timedelta(hours=STEWARD_TTL_HOURS)
|
|
steward.energy = STEWARD_ENERGY_MAX
|
|
steward.pending_until = None
|
|
steward.save()
|
|
text = (
|
|
f"Steward activated for cell {steward.geohash}.\n"
|
|
f"Target bot: {meta['label']}.\n"
|
|
f"Power window: {STEWARD_TTL_HOURS}h.\n"
|
|
"Commands: /status /bless /signal"
|
|
)
|
|
return _render_text(text)
|
|
return _render_text("Unknown action. Use /steward to start.")
|
|
|
|
if cmd in {"steward", "start"}:
|
|
if steward.status == Stewardship.STATUS_ACTIVE and steward.expires_at and steward.expires_at > timezone.now():
|
|
return _render_text(
|
|
f"You are already Steward of {steward.geohash}.\n"
|
|
"Use /status, /bless, or /signal."
|
|
)
|
|
|
|
latlon = _parse_latlon(args)
|
|
if latlon:
|
|
location = {"lat": latlon[0], "lon": latlon[1]}
|
|
|
|
steward.status = Stewardship.STATUS_AWAITING_LOCATION
|
|
steward.pending_until = timezone.now() + timedelta(minutes=STEWARD_SESSION_TTL_MIN)
|
|
steward.save()
|
|
|
|
if location.get("lat") is not None and location.get("lon") is not None:
|
|
cmd = "__location__"
|
|
else:
|
|
return _render_text(
|
|
"Steward activation started.\n"
|
|
"Send your location to anchor your cell.\n"
|
|
"Your identity stays private."
|
|
)
|
|
|
|
if cmd == "__location__":
|
|
if steward.status != Stewardship.STATUS_AWAITING_LOCATION:
|
|
return _render_text("Send /steward to start steward activation.")
|
|
if location.get("lat") is None or location.get("lon") is None:
|
|
return _render_text("Location missing. Send your location to continue.")
|
|
lat = float(location.get("lat"))
|
|
lon = float(location.get("lon"))
|
|
geohash = encode_geohash(lat, lon, precision=STEWARD_GEOHASH_PRECISION)
|
|
if _cell_taken(geohash, exclude_user_id=steward.user_id):
|
|
return _render_text(
|
|
"That cell already has a Steward.\n"
|
|
"Send another location to try a different cell."
|
|
)
|
|
steward.lat = lat
|
|
steward.lon = lon
|
|
steward.geohash = geohash
|
|
steward.status = Stewardship.STATUS_AWAITING_TARGET
|
|
steward.pending_until = timezone.now() + timedelta(minutes=STEWARD_SESSION_TTL_MIN)
|
|
steward.save()
|
|
text = (
|
|
f"Cell captured: {geohash}.\n"
|
|
"Choose which bot to empower for this cell:"
|
|
)
|
|
return _render_text(text, buttons=_activation_buttons())
|
|
|
|
if cmd == "status":
|
|
if steward.status != Stewardship.STATUS_ACTIVE or not steward.geohash:
|
|
return _render_text("No active stewardship. Use /steward to begin.")
|
|
active_bless = StewardBlessing.objects.filter(
|
|
geohash=steward.geohash,
|
|
ends_at__gt=timezone.now(),
|
|
).count()
|
|
remaining = int((steward.expires_at - timezone.now()).total_seconds() / 3600) if steward.expires_at else 0
|
|
sim_url = _simulation_url(request, steward)
|
|
text = (
|
|
f"Steward status\n"
|
|
f"Cell: {steward.geohash}\n"
|
|
f"Target bot: {steward.target_bot}\n"
|
|
f"Energy: {steward.energy}/{STEWARD_ENERGY_MAX}\n"
|
|
f"Blessings active: {active_bless}\n"
|
|
f"Power window: {remaining}h\n"
|
|
f"Simulation: {sim_url}"
|
|
)
|
|
return _render_text(text, buttons=[{"label": "Open Simulation", "kind": "open_url", "url": sim_url}])
|
|
|
|
if cmd == "bless":
|
|
if steward.status != Stewardship.STATUS_ACTIVE or not steward.geohash:
|
|
return _render_text("No active stewardship. Use /steward to begin.")
|
|
if steward.energy < STEWARD_BLESS_COST:
|
|
return _render_text("Not enough energy to bless. Try later.")
|
|
minutes = STEWARD_BLESS_MINUTES
|
|
if args:
|
|
try:
|
|
minutes = max(10, min(240, int(args.split()[0])))
|
|
except Exception:
|
|
minutes = STEWARD_BLESS_MINUTES
|
|
starts = timezone.now()
|
|
ends = starts + timedelta(minutes=minutes)
|
|
StewardBlessing.objects.create(
|
|
steward=steward,
|
|
geohash=steward.geohash,
|
|
target_bot=steward.target_bot or "unknown",
|
|
multiplier=STEWARD_BLESS_MULTIPLIER,
|
|
starts_at=starts,
|
|
ends_at=ends,
|
|
)
|
|
steward.energy = max(0, steward.energy - STEWARD_BLESS_COST)
|
|
steward.save(update_fields=["energy", "updated_at"])
|
|
text = (
|
|
f"Blessing active for {minutes} min.\n"
|
|
f"Cell: {steward.geohash}\n"
|
|
f"Boost: x{STEWARD_BLESS_MULTIPLIER:.1f} Green Points"
|
|
)
|
|
return _render_text(text)
|
|
|
|
if cmd == "signal":
|
|
if steward.status != Stewardship.STATUS_ACTIVE or not steward.geohash:
|
|
return _render_text("No active stewardship. Use /steward to begin.")
|
|
note = args or "cell_needs_attention"
|
|
StewardSignal.objects.create(
|
|
steward=steward,
|
|
geohash=steward.geohash,
|
|
target_bot=steward.target_bot or "unknown",
|
|
kind="signal",
|
|
note=note,
|
|
)
|
|
return _render_text(
|
|
f"Signal recorded for {steward.geohash}.\n"
|
|
"The city agents will adjust in your cell."
|
|
)
|
|
|
|
return _render_text(
|
|
"Steward commands:\n"
|
|
"/steward - activate stewardship\n"
|
|
"/status - view status\n"
|
|
"/bless [minutes] - boost Green Points\n"
|
|
"/signal <note> - log a cell signal"
|
|
)
|
|
|
|
|
|
@csrf_exempt
|
|
@require_GET
|
|
def blessings_active(request: HttpRequest):
|
|
geohash = (request.GET.get("geohash") or "").strip()
|
|
target_bot = (request.GET.get("bot") or "").strip()
|
|
if not geohash:
|
|
return JsonResponse({"ok": False, "code": "missing_geohash"}, status=400)
|
|
now = timezone.now()
|
|
qs = StewardBlessing.objects.filter(geohash=geohash, ends_at__gt=now)
|
|
if target_bot:
|
|
qs = qs.filter(target_bot=target_bot)
|
|
data = [
|
|
{
|
|
"geohash": b.geohash,
|
|
"target_bot": b.target_bot,
|
|
"multiplier": b.multiplier,
|
|
"starts_at": b.starts_at.isoformat(),
|
|
"ends_at": b.ends_at.isoformat(),
|
|
}
|
|
for b in qs
|
|
]
|
|
return JsonResponse({"ok": True, "blessings": data})
|
|
|
|
|
|
@csrf_exempt
|
|
@require_GET
|
|
def steward_health(request: HttpRequest):
|
|
return JsonResponse({"ok": True, "service": "pxy_stewards"})
|