import json import os from datetime import timedelta from typing import Any, Dict, Optional from urllib.parse import urlencode from django.http import JsonResponse, HttpRequest from django.utils import timezone from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_GET, require_http_methods from pxy_stewards.models import Stewardship, StewardBlessing, StewardSignal from pxy_stewards.utils import encode_geohash from core.urlbuild import public_url STEWARD_TTL_HOURS = int(os.getenv("STEWARD_TTL_HOURS", "168")) STEWARD_SESSION_TTL_MIN = int(os.getenv("STEWARD_SESSION_TTL_MIN", "30")) STEWARD_GEOHASH_PRECISION = int(os.getenv("STEWARD_GEOHASH_PRECISION", "7")) STEWARD_ENERGY_MAX = int(os.getenv("STEWARD_ENERGY_MAX", "100")) STEWARD_BLESS_COST = int(os.getenv("STEWARD_BLESS_COST", "25")) STEWARD_BLESS_MINUTES = int(os.getenv("STEWARD_BLESS_MINUTES", "60")) STEWARD_BLESS_MULTIPLIER = float(os.getenv("STEWARD_BLESS_MULTIPLIER", "2.0")) TARGET_BOTS = { "coins": {"label": "Citizens (Pepe Basurita)", "bot": "PepeBasuritaCoinsBot"}, "motito": {"label": "Motito (On-demand)", "bot": "PepeMotitoBot"}, "camion": {"label": "Camioncito (Route)", "bot": "PepeCamioncitoBot"}, } def _render_text(text: str, buttons: Optional[list] = None) -> JsonResponse: spec = { "schema_version": "render.v1", "messages": [{"type": "text", "text": text}], } if buttons: spec["buttons"] = buttons return JsonResponse(spec) def _load_body(request: HttpRequest) -> Dict[str, Any]: try: raw = (request.body or b"").decode("utf-8") return json.loads(raw or "{}") except Exception: return {} def _extract_args(body: Dict[str, Any]) -> str: args_raw = (body.get("input") or {}).get("args_raw") or "" if args_raw.startswith("/"): parts = args_raw.split(" ", 1) return parts[1].strip() if len(parts) > 1 else "" return args_raw.strip() def _parse_latlon(raw: str) -> Optional[tuple[float, float]]: parts = [p.strip() for p in (raw or "").split(",")] if len(parts) != 2: return None try: lat = float(parts[0]) lon = float(parts[1]) except Exception: return None return lat, lon def _platform(body: Dict[str, Any]) -> str: ctx = body.get("context") or {} return (ctx.get("platform") or "telegram").strip().lower() def _user_id(body: Dict[str, Any]) -> Optional[str]: user = body.get("user") or {} if user.get("id") is None: return None return str(user.get("id")) def _bot_name(body: Dict[str, Any]) -> str: bot = body.get("bot") or {} return (bot.get("username") or "unknown").strip() def _get_steward(platform: str, user_id: str) -> Stewardship: steward, _ = Stewardship.objects.get_or_create( platform=platform, user_id=user_id, defaults={"bot_name": "unknown", "energy": 0}, ) return steward def _expire_if_needed(steward: Stewardship) -> None: if steward.status == Stewardship.STATUS_ACTIVE and steward.expires_at: if steward.expires_at <= timezone.now(): steward.status = Stewardship.STATUS_EXPIRED steward.energy = 0 steward.save(update_fields=["status", "energy", "updated_at"]) def _cell_taken(geohash: str, exclude_user_id: Optional[str] = None) -> bool: now = timezone.now() qs = Stewardship.objects.filter( geohash=geohash, status=Stewardship.STATUS_ACTIVE, expires_at__gt=now, ) if exclude_user_id: qs = qs.exclude(user_id=exclude_user_id) return qs.exists() def _activation_buttons() -> list: buttons = [] for key, meta in TARGET_BOTS.items(): buttons.append({ "label": meta["label"], "kind": "callback_api", "action": "choose", "params": {"b": key}, }) return buttons def _simulation_url(request: HttpRequest, steward: Stewardship) -> str: lat = steward.lat if steward.lat is not None else 19.4326 lon = steward.lon if steward.lon is not None else -99.1332 params = { "lat": f"{lat:.6f}", "lon": f"{lon:.6f}", "dist": "700", "auto": "1", } return f"{public_url('/simulations/mvp/', request)}?{urlencode(params)}" @csrf_exempt @require_http_methods(["POST"]) def steward_handle(request: HttpRequest): body = _load_body(request) platform = _platform(body) user_id = _user_id(body) bot_name = _bot_name(body) if not user_id: return _render_text("Missing user context. Try again from the bot.") cmd = ((body.get("command") or {}).get("name") or "").strip().lstrip("/").lower() trigger = ((body.get("command") or {}).get("trigger") or "").strip().lower() args = _extract_args(body) location = (body.get("input") or {}).get("location") or {} steward = _get_steward(platform, user_id) _expire_if_needed(steward) if steward.bot_name != bot_name: steward.bot_name = bot_name steward.save(update_fields=["bot_name", "updated_at"]) if trigger == "callback": data_raw = ((body.get("callback") or {}).get("data") or "") try: data = json.loads(data_raw) except Exception: data = {} action = (data.get("a") or data.get("action") or "").lower() params = data.get("p") or data.get("params") or {} if action == "choose": key = (params.get("b") or params.get("bot") or "").lower() meta = TARGET_BOTS.get(key) if not meta: return _render_text("Unknown target bot. Try /steward again.") if steward.status != Stewardship.STATUS_AWAITING_TARGET: return _render_text("No pending activation. Use /steward to start.") if steward.geohash and _cell_taken(steward.geohash, exclude_user_id=steward.user_id): return _render_text("That cell already has a Steward. Choose a new location.") steward.target_bot = meta["bot"] steward.status = Stewardship.STATUS_ACTIVE steward.expires_at = timezone.now() + timedelta(hours=STEWARD_TTL_HOURS) steward.energy = STEWARD_ENERGY_MAX steward.pending_until = None steward.save() text = ( f"Steward activated for cell {steward.geohash}.\n" f"Target bot: {meta['label']}.\n" f"Power window: {STEWARD_TTL_HOURS}h.\n" "Commands: /status /bless /signal" ) return _render_text(text) return _render_text("Unknown action. Use /steward to start.") if cmd in {"steward", "start"}: if steward.status == Stewardship.STATUS_ACTIVE and steward.expires_at and steward.expires_at > timezone.now(): return _render_text( f"You are already Steward of {steward.geohash}.\n" "Use /status, /bless, or /signal." ) latlon = _parse_latlon(args) if latlon: location = {"lat": latlon[0], "lon": latlon[1]} steward.status = Stewardship.STATUS_AWAITING_LOCATION steward.pending_until = timezone.now() + timedelta(minutes=STEWARD_SESSION_TTL_MIN) steward.save() if location.get("lat") is not None and location.get("lon") is not None: cmd = "__location__" else: return _render_text( "Steward activation started.\n" "Send your location to anchor your cell.\n" "Your identity stays private." ) if cmd == "__location__": if steward.status != Stewardship.STATUS_AWAITING_LOCATION: return _render_text("Send /steward to start steward activation.") if location.get("lat") is None or location.get("lon") is None: return _render_text("Location missing. Send your location to continue.") lat = float(location.get("lat")) lon = float(location.get("lon")) geohash = encode_geohash(lat, lon, precision=STEWARD_GEOHASH_PRECISION) if _cell_taken(geohash, exclude_user_id=steward.user_id): return _render_text( "That cell already has a Steward.\n" "Send another location to try a different cell." ) steward.lat = lat steward.lon = lon steward.geohash = geohash steward.status = Stewardship.STATUS_AWAITING_TARGET steward.pending_until = timezone.now() + timedelta(minutes=STEWARD_SESSION_TTL_MIN) steward.save() text = ( f"Cell captured: {geohash}.\n" "Choose which bot to empower for this cell:" ) return _render_text(text, buttons=_activation_buttons()) if cmd == "status": if steward.status != Stewardship.STATUS_ACTIVE or not steward.geohash: return _render_text("No active stewardship. Use /steward to begin.") active_bless = StewardBlessing.objects.filter( geohash=steward.geohash, ends_at__gt=timezone.now(), ).count() remaining = int((steward.expires_at - timezone.now()).total_seconds() / 3600) if steward.expires_at else 0 sim_url = _simulation_url(request, steward) text = ( f"Steward status\n" f"Cell: {steward.geohash}\n" f"Target bot: {steward.target_bot}\n" f"Energy: {steward.energy}/{STEWARD_ENERGY_MAX}\n" f"Blessings active: {active_bless}\n" f"Power window: {remaining}h\n" f"Simulation: {sim_url}" ) return _render_text(text, buttons=[{"label": "Open Simulation", "kind": "open_url", "url": sim_url}]) if cmd == "bless": if steward.status != Stewardship.STATUS_ACTIVE or not steward.geohash: return _render_text("No active stewardship. Use /steward to begin.") if steward.energy < STEWARD_BLESS_COST: return _render_text("Not enough energy to bless. Try later.") minutes = STEWARD_BLESS_MINUTES if args: try: minutes = max(10, min(240, int(args.split()[0]))) except Exception: minutes = STEWARD_BLESS_MINUTES starts = timezone.now() ends = starts + timedelta(minutes=minutes) StewardBlessing.objects.create( steward=steward, geohash=steward.geohash, target_bot=steward.target_bot or "unknown", multiplier=STEWARD_BLESS_MULTIPLIER, starts_at=starts, ends_at=ends, ) steward.energy = max(0, steward.energy - STEWARD_BLESS_COST) steward.save(update_fields=["energy", "updated_at"]) text = ( f"Blessing active for {minutes} min.\n" f"Cell: {steward.geohash}\n" f"Boost: x{STEWARD_BLESS_MULTIPLIER:.1f} Green Points" ) return _render_text(text) if cmd == "signal": if steward.status != Stewardship.STATUS_ACTIVE or not steward.geohash: return _render_text("No active stewardship. Use /steward to begin.") note = args or "cell_needs_attention" StewardSignal.objects.create( steward=steward, geohash=steward.geohash, target_bot=steward.target_bot or "unknown", kind="signal", note=note, ) return _render_text( f"Signal recorded for {steward.geohash}.\n" "The city agents will adjust in your cell." ) return _render_text( "Steward commands:\n" "/steward - activate stewardship\n" "/status - view status\n" "/bless [minutes] - boost Green Points\n" "/signal - log a cell signal" ) @csrf_exempt @require_GET def blessings_active(request: HttpRequest): geohash = (request.GET.get("geohash") or "").strip() target_bot = (request.GET.get("bot") or "").strip() if not geohash: return JsonResponse({"ok": False, "code": "missing_geohash"}, status=400) now = timezone.now() qs = StewardBlessing.objects.filter(geohash=geohash, ends_at__gt=now) if target_bot: qs = qs.filter(target_bot=target_bot) data = [ { "geohash": b.geohash, "target_bot": b.target_bot, "multiplier": b.multiplier, "starts_at": b.starts_at.isoformat(), "ends_at": b.ends_at.isoformat(), } for b in qs ] return JsonResponse({"ok": True, "blessings": data}) @csrf_exempt @require_GET def steward_health(request: HttpRequest): return JsonResponse({"ok": True, "service": "pxy_stewards"})