#!/usr/bin/env python3 """ mc_aigod.py — Minecraft AI God watcher Intercepts /pray commands, fetches live server state, queries Ollama, validates targets, and executes commands via RCON. Config: /etc/mc_aigod.json """ import json, os, random, re, socket, struct, threading, time, logging from collections import deque from datetime import datetime from typing import Any, Dict import shutil from urllib.parse import parse_qs, unquote, urljoin, urlparse import requests logging.basicConfig( level=logging.INFO, format='%(asctime)s [aigod] %(levelname)s: %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('/var/log/mc_aigod_paper.log'), ] ) log = logging.getLogger(__name__) CONFIG_PATH = os.environ.get('MC_AIGOD_CONFIG', '/etc/mc_aigod_paper.json') PRAY_PATTERNS = [ re.compile(r'\[.*?\]: (?:\[Not Secure\] )?<(\w+)> [Pp]ray (.+)'), ] BIBLE_PATTERNS = [ re.compile(r'\[.*?\]: (?:\[Not Secure\] )?<(\w+)> [Bb]ible\s*$'), ] SUDO_PATTERNS = [ re.compile(r'\[.*?\]: (?:\[Not Secure\] )?<(\w+)> [Ss]udo (.+)'), ] BUG_LOG_PATTERNS = [ re.compile(r'\[.*?\]: (?:\[Not Secure\] )?<(\w+)> [Bb]ug[_ ]?[Ll]og(?:\s+(.+))?\s*$'), ] JOIN_PATTERN = re.compile(r'\[.*?\]: (\w+) joined the game') LEAVE_PATTERN = re.compile(r'\[.*?\]: (\w+) left the game') # Interesting server events to capture for God's awareness # Matches: chat, deaths, joins, leaves — skips RCON/thread noise LOG_INTERESTING = re.compile( r'\[.*?(?:Server thread|Async Chat).*?INFO\]: ' r'(?:<\w+>.*|' # chat r'\w+ joined the game|' # join r'\w+ left the game|' # leave r'\w+ (?:died|was slain|was shot|drowned|fell|burned|blew up|suffocated|starved|was killed|hit the ground|went up in flames|tried to swim in lava).*)' ) # --------------------------------------------------------------------------- # Shared memory buffers (module-level, shared across threads) # --------------------------------------------------------------------------- # Rolling log buffer — keeps last LOG_MAX_LINES interesting events, # pruning anything older than LOG_MAX_HOURS. Whichever limit hits first. LOG_MAX_LINES = 200 LOG_MAX_HOURS = 3 recent_log: deque = deque() # entries: (timestamp_float, str) # God's prayer memory — last N prayer/response pairs across all players # Stored as (player, prayer_text, god_message) tuples PRAYER_MEMORY_SIZE = 10 prayer_memory: deque = deque() # entries: (player, prayer, god_message) # First-login benevolence memory — players already blessed on first join first_login_seen = set() # Sudo translator memory — last N sudo translations/executions SUDO_HISTORY_SIZE = 10 sudo_history: deque = deque() # entries: (ts, player, prompt, translated_cmds, executed_cmds) # Sudo failure memory — last N failed command/result pairs, for anti-repeat context. SUDO_FAILURE_SIZE = 20 sudo_failures: deque = deque() # entries: (ts, player, command, error) # Last sudo execution feedback by player for follow-up questions. last_sudo_feedback: Dict[str, Dict[str, Any]] = {} _memory_lock = threading.Lock() # Gateway client session mapping (player+mode -> session_id) _gateway_sessions = {} _gateway_lock = threading.Lock() DEFAULT_TEMPLATE_DIR = "/opt/paper-ai-25567/worldkit_templates" DEFAULT_TEMPLATE_HOSTS = [ "raw.githubusercontent.com", "github.com", "planetminecraft.com", "www.planetminecraft.com", "modrinth.com", "cdn.modrinth.com", ] DEFAULT_TEMPLATE_SEARCH_BLOCKED_HOSTS = [ "planetminecraft.com", "www.planetminecraft.com", ] DEFAULT_FAWE_SCHEM_DIR = "/opt/paper-ai-25567/plugins/FastAsyncWorldEdit/schematics" TEMPLATE_SEARCH_CACHE_SIZE = 20 _template_search_cache = {} _template_search_lock = threading.Lock() _template_last_downloaded = {} def add_log_event(line: str): """Add a meaningful log line to the rolling buffer.""" if not LOG_INTERESTING.search(line): return m = re.search(r'\[.*?INFO\]: (.+)', line) if not m: return entry = m.group(1).strip() now = time.time() with _memory_lock: recent_log.append((now, entry)) # Prune by age cutoff = now - (LOG_MAX_HOURS * 3600) while recent_log and recent_log[0][0] < cutoff: recent_log.popleft() # Prune by line count while len(recent_log) > LOG_MAX_LINES: recent_log.popleft() def _send_private(player: str, text: str, config, color: str = "gray", italic: bool = False): safe = text.replace("\\", "\\\\").replace('"', '\\"').replace("\n", " ").replace("\r", " ") italic_raw = "true" if italic else "false" rcon( f'tellraw {player} {{"text":"{safe}","color":"{color}","italic":{italic_raw}}}', config["rcon_host"], config["rcon_port"], config["rcon_password"], ) def _sudo_trace(player: str, text: str, config, color: str = "dark_gray"): if not bool(config.get("sudo_trace_commands", True)): return _send_private(player, text, config, color=color) def _template_path(config) -> str: return config.get("template_dir", DEFAULT_TEMPLATE_DIR) def _fawe_schem_path(config) -> str: return config.get("fawe_schem_dir", DEFAULT_FAWE_SCHEM_DIR) def _template_hosts(config) -> list: hosts = config.get("template_allowed_hosts", DEFAULT_TEMPLATE_HOSTS) return [str(h).lower().strip() for h in hosts if str(h).strip()] def _template_search_blocked_hosts(config) -> list: hosts = config.get("template_search_blocked_hosts", DEFAULT_TEMPLATE_SEARCH_BLOCKED_HOSTS) return [str(h).lower().strip() for h in hosts if str(h).strip()] def _is_search_blocked_host(url: str, config) -> bool: try: host = urlparse(url).netloc.lower() except Exception: return False for h in _template_search_blocked_hosts(config): if host == h or host.endswith("." + h): return True return False def _safe_template_name(name: str) -> str: n = re.sub(r'[^a-zA-Z0-9._-]+', '_', name.strip()) return n[:80] if n else f"template_{int(time.time())}" def _is_allowed_template_url(url: str, config) -> bool: try: p = urlparse(url) if p.scheme not in ("https",): return False host = (p.netloc or "").lower() allowed = _template_hosts(config) return any(host == h or host.endswith("." + h) for h in allowed) except Exception: return False def _extract_ddg_links_from_html(html: str) -> list: links = [] for href in re.findall(r'href="([^"]+)"', html): if "/l/?" not in href: continue parsed = urlparse(href) qs = parse_qs(parsed.query) target = (qs.get("uddg") or [""])[0] if not target: continue url = unquote(target) if url.startswith("http://") or url.startswith("https://"): links.append(url) return links def _search_templates(query: str, config) -> list: # No page scraping: return direct file URLs only. limit = int(config.get("template_search_limit", 8)) out = [] seen = set() qlow = query.lower().strip() def add_url(url: str, text: str = ""): if url in seen: return if not _is_allowed_template_url(url, config): return path = urlparse(url).path.lower() if not re.search(r'\.(schem|schematic|nbt|zip)$', path): return seen.add(url) out.append({"url": url, "text": text[:160]}) # First: include configured sync catalog entries that match query. try: for e in _template_sync_entries(config): u = e.get("url", "") nm = e.get("name", "") blob = f"{u} {nm}".lower() if qlow and qlow not in blob: continue add_url(u, "sync source") if len(out) >= limit: return out except Exception as e: log.debug(f"Template sync catalog presearch failed: {e}") for ext in ("schem", "schematic", "nbt", "zip"): if len(out) >= limit: break try: r = requests.get( "https://api.github.com/search/code", params={"q": f"{query} extension:{ext} in:path", "per_page": 10}, timeout=20, headers={"Accept": "application/vnd.github+json"}, ) if r.status_code != 200: continue data = r.json() for item in data.get("items") or []: html_url = item.get("html_url") or "" if not html_url: continue p = urlparse(html_url) if p.netloc.lower() == "github.com" and "/blob/" in p.path: raw_path = p.path.replace("/blob/", "/", 1) raw = f"https://raw.githubusercontent.com{raw_path}" add_url(raw, "github code search") if len(out) >= limit: break except Exception as e: log.debug(f"Template GitHub code search failed: {e}") return out def _format_bytes(n: int) -> str: if n < 1024: return f"{n}B" if n < 1024 * 1024: return f"{n/1024:.1f}KB" return f"{n/(1024*1024):.1f}MB" def _cache_template_search(player: str, rows: list): urls = [str(r.get("url", "")).strip() for r in rows if isinstance(r, dict)] urls = [u for u in urls if u] with _template_search_lock: _template_search_cache[player] = { "ts": time.time(), "urls": urls[:TEMPLATE_SEARCH_CACHE_SIZE], } def _get_cached_template_url(player: str, token: str) -> str: tok = token.strip().strip("[](){}\\") if tok.startswith("#"): tok = tok[1:] if not tok.isdigit(): return "" idx = int(tok) if idx <= 0: return "" with _template_search_lock: entry = _template_search_cache.get(player) or {} urls = entry.get("urls") or [] if idx > len(urls): return "" return urls[idx - 1] def _get_cached_template_urls(player: str) -> list: with _template_search_lock: entry = _template_search_cache.get(player) or {} urls = entry.get("urls") or [] return [str(u) for u in urls if str(u).strip()] def _list_templates(config, max_rows: int = 12) -> list: root = _template_path(config) os.makedirs(root, exist_ok=True) rows = [] for name in sorted(os.listdir(root)): full = os.path.join(root, name) if not os.path.isfile(full): continue try: size = os.path.getsize(full) except OSError: size = 0 rows.append({"name": name, "size": size}) rows.sort(key=lambda x: x["name"].lower()) return rows[:max_rows] def _delete_template(name: str, config) -> str: safe = os.path.basename(name.strip()) if not safe: raise ValueError("Missing template filename") if safe != name.strip(): raise ValueError("Invalid template filename") full = os.path.join(_template_path(config), safe) if not os.path.isfile(full): raise ValueError("Template file not found") os.remove(full) return full def _download_template(url: str, name_hint: str, config) -> str: return _download_template_inner(url, name_hint, config, depth=0) def _template_sync_entries(config) -> list: entries = [] for url in config.get("template_sync_urls", []) or []: u = str(url).strip() if u: entries.append({"url": u, "name": ""}) manifest_url = str(config.get("template_sync_manifest_url", "")).strip() if manifest_url: r = requests.get(manifest_url, timeout=25) r.raise_for_status() data = r.json() if isinstance(data, list): for item in data: if isinstance(item, str): entries.append({"url": item.strip(), "name": ""}) elif isinstance(item, dict): entries.append({"url": str(item.get("url", "")).strip(), "name": str(item.get("name", "")).strip()}) elif isinstance(data, dict): for item in data.get("templates", []) or []: if isinstance(item, str): entries.append({"url": item.strip(), "name": ""}) elif isinstance(item, dict): entries.append({"url": str(item.get("url", "")).strip(), "name": str(item.get("name", "")).strip()}) deduped = [] seen = set() for e in entries: u = e.get("url", "").strip() if not u or u in seen: continue if not _is_allowed_template_url(u, config): continue seen.add(u) deduped.append({"url": u, "name": e.get("name", "").strip()}) return deduped def _template_sync(config) -> dict: entries = _template_sync_entries(config) if not entries: return {"ok": 0, "failed": 0, "errors": ["No template sync sources configured"]} max_files = int(config.get("template_sync_max_files", 50)) ok = 0 failed = 0 errors = [] for e in entries[:max_files]: try: _download_template(e["url"], e.get("name", ""), config) ok += 1 except Exception as ex: failed += 1 errors.append(f"{e['url']}: {ex}") return {"ok": ok, "failed": failed, "errors": errors[:6]} def _extract_template_candidates_from_html(base_url: str, html: str, config) -> list: candidates = [] seen = set() for href in re.findall(r'href=["\']([^"\']+)["\']', html, re.IGNORECASE): full = urljoin(base_url, href.strip()) if full in seen: continue seen.add(full) p = urlparse(full) host = p.netloc.lower() path = p.path.lower() if host == "github.com" and "/blob/" in p.path and re.search(r'\.(schem|schematic|nbt|zip)$', path): raw_path = p.path.replace("/blob/", "/", 1) raw = f"https://raw.githubusercontent.com{raw_path}" if _is_allowed_template_url(raw, config): candidates.append(raw) continue if re.search(r'\.(schem|schematic|nbt|zip)$', path): if _is_allowed_template_url(full, config): candidates.append(full) return candidates def _extract_template_page_candidates_from_html(base_url: str, html: str, config) -> list: pages = [] seen = set() base_host = urlparse(base_url).netloc.lower() for href in re.findall(r'href=["\']([^"\']+)["\']', html, re.IGNORECASE): full = urljoin(base_url, href.strip()) if full in seen: continue seen.add(full) p = urlparse(full) host = p.netloc.lower() path = p.path.lower() if _is_search_blocked_host(full, config): continue if not _is_allowed_template_url(full, config): continue if re.search(r'\.(schem|schematic|nbt|zip)$', path): continue if re.search(r'\.(png|jpg|jpeg|gif|webp|svg|ico|css|js|txt|json|xml|pdf)$', path): continue if '/search' in path or '/projects/' == path or path.endswith('/projects/'): continue # Prefer detail pages on same domain as seed page. if host == base_host or 'planetminecraft.com' in host or 'modrinth.com' in host or host == 'github.com': pages.append(full) return pages[:6] def _looks_like_template_binary(first_bytes: bytes, content_type: str) -> bool: if not first_bytes: return False # Known binary containers used by schematics/templates. if first_bytes.startswith(b"\x1f\x8b"): # gzip return True if first_bytes.startswith(b"PK\x03\x04"): # zip return True if first_bytes[:1] == b"\x0a": # NBT tag root (common for uncompressed NBT) return True # Common non-template binary signatures to reject. if first_bytes.startswith(b"\x89PNG\r\n\x1a\n"): return False if first_bytes.startswith(b"\xff\xd8\xff"): # jpeg return False if first_bytes.startswith(b"GIF87a") or first_bytes.startswith(b"GIF89a"): return False if first_bytes.startswith(b"%PDF"): return False ct = (content_type or "").lower() if ct.startswith("text/") or "html" in ct or "css" in ct or "javascript" in ct or "json" in ct: return False # Heuristic: templates should not look like plain text/CSS/HTML. sample = first_bytes[:256] printable = sum(1 for b in sample if 9 <= b <= 13 or 32 <= b <= 126) ratio = printable / max(1, len(sample)) low = sample.lower() if ratio > 0.9 and (b" 0.97: return False # If it is not one of known schematic container signatures, reject. return False def _download_template_inner(url: str, name_hint: str, config, depth: int = 0, visited=None) -> str: if visited is None: visited = set() if url in visited: raise ValueError("Template URL loop detected") visited.add(url) if depth > 2: raise ValueError("Could not resolve a direct template file URL") p_in = urlparse(url) if p_in.netloc.lower() == "github.com" and "/blob/" in p_in.path: # Convert common GitHub blob URL to raw URL automatically. raw_path = p_in.path.replace("/blob/", "/", 1) url = f"https://raw.githubusercontent.com{raw_path}" if not _is_allowed_template_url(url, config): raise ValueError("Template URL host not allowed") p = urlparse(url) base = os.path.basename(p.path) or "template.schem" if "." in base: ext = "." + base.split(".")[-1].lower() else: ext = ".schem" if ext not in (".schem", ".schematic", ".nbt", ".zip"): ext = ".schem" fname = _safe_template_name(name_hint or os.path.splitext(base)[0]) + ext out_dir = _template_path(config) os.makedirs(out_dir, exist_ok=True) out_path = os.path.join(out_dir, fname) try: with requests.get(url, stream=True, timeout=45) as r: if r.status_code == 403: raise ValueError(f"Source blocked automated download (403): {urlparse(url).netloc}") r.raise_for_status() content_type = (r.headers.get("Content-Type") or "").lower() if "text/html" in content_type: raise ValueError("URL returned HTML page. template download requires a direct file URL") total = 0 max_bytes = int(config.get("template_max_bytes", 10 * 1024 * 1024)) first_bytes = b"" with open(out_path, "wb") as f: for chunk in r.iter_content(chunk_size=65536): if not chunk: continue if not first_bytes: first_bytes = chunk[:512] if not _looks_like_template_binary(first_bytes, content_type): raise ValueError("Downloaded content is not a valid binary template file") total += len(chunk) if total > max_bytes: raise ValueError("Template file too large") f.write(chunk) if os.path.isfile(out_path) and os.path.getsize(out_path) == 0: raise ValueError("Downloaded empty template file") except Exception: try: if os.path.isfile(out_path): os.remove(out_path) except Exception: pass raise return out_path def _cache_last_download(player: str, path: str): with _template_search_lock: _template_last_downloaded[player] = path def _last_download(player: str) -> str: with _template_search_lock: return _template_last_downloaded.get(player, "") def _resolve_template_file(name_or_file: str, player: str, config) -> str: token = (name_or_file or "").strip().strip("[](){}\\") if not token: return _last_download(player) root = _template_path(config) direct = os.path.join(root, os.path.basename(token)) if os.path.isfile(direct): return direct stem = os.path.basename(token) for ext in (".schem", ".schematic", ".nbt", ".zip"): cand = os.path.join(root, stem + ext) if os.path.isfile(cand): return cand return "" def _stage_template_for_fawe(src_path: str, config) -> str: schem_dir = _fawe_schem_path(config) os.makedirs(schem_dir, exist_ok=True) base = os.path.basename(src_path) name_no_ext = _safe_template_name(os.path.splitext(base)[0]) dest = os.path.join(schem_dir, name_no_ext + ".schem") shutil.copyfile(src_path, dest) return name_no_ext def _info_lookup_wiki(query: str) -> list: try: r = requests.get( "https://minecraft.wiki/api.php", params={ "action": "query", "list": "search", "srsearch": query, "srlimit": 3, "format": "json", }, timeout=20, ) r.raise_for_status() data = r.json() rows = [] for it in (data.get("query", {}).get("search", []) or [])[:3]: title = str(it.get("title", "")).strip() if not title: continue rows.append(f"minecraft.wiki: {title}") return rows except Exception: return [] def _info_lookup_web(query: str) -> list: try: r = requests.get( "https://api.duckduckgo.com/", params={"q": query, "format": "json", "no_redirect": 1, "no_html": 1}, timeout=20, ) r.raise_for_status() data = r.json() rows = [] abs_text = (data.get("AbstractText") or "").strip() if abs_text: rows.append(abs_text[:220]) heading = (data.get("Heading") or "").strip() if heading and not abs_text: rows.append(f"Topic: {heading}") for it in data.get("RelatedTopics") or []: if isinstance(it, dict) and it.get("Text"): rows.append(str(it.get("Text"))[:220]) if len(rows) >= 3: break return rows[:3] except Exception: return [] def process_sudo_template_command(player: str, prompt: str, config) -> bool: """ Handle deterministic template management via sudo. Returns True if prompt was a template command and has been handled. """ raw = prompt.strip() p = raw.lower() if not p.startswith("template "): return False if not ( p == "template help" or p.startswith("template search ") or p.startswith("template download ") or p.startswith("template install ") or p.startswith("template pick ") or p == "template sync" or p.startswith("template build") or p == "template list" or p.startswith("template delete ") or p == "template hosts" ): _send_private(player, "[SUDO-TEMPLATE] Unknown template command. Use: template help", config, "yellow") return True try: if p == "template help": _send_private(player, "[SUDO-TEMPLATE] Commands:", config, "aqua") _send_private(player, "- template search ", config, "gray") _send_private(player, "- template download [name]", config, "gray") _send_private(player, "- template install [name]", config, "gray") _send_private(player, "- template pick [name] (download from last search)", config, "gray") _send_private(player, "- template sync (pull from configured sync sources)", config, "gray") _send_private(player, "- template build (or no arg = last download)", config, "gray") _send_private(player, "- template list", config, "gray") _send_private(player, "- template delete ", config, "gray") _send_private(player, "- template hosts", config, "gray") return True if p == "template hosts": _send_private(player, "[SUDO-TEMPLATE] Allowed hosts:", config, "aqua") for h in _template_hosts(config): _send_private(player, f"- {h}", config, "gray") return True if p == "template list": rows = _list_templates(config) if not rows: _send_private(player, "[SUDO-TEMPLATE] Template folder is empty.", config, "yellow") return True _send_private(player, "[SUDO-TEMPLATE] Local templates:", config, "aqua") for row in rows: _send_private(player, f"- {row['name']} ({_format_bytes(row['size'])})", config, "gray") return True if p.startswith("template delete "): name = raw[len("template delete "):].strip() if not name: _send_private(player, "[SUDO-TEMPLATE] Usage: template delete ", config, "yellow") return True deleted = _delete_template(name, config) _send_private(player, f"[SUDO-TEMPLATE] Deleted: {deleted}", config, "green") return True if p.startswith("template search "): query = raw[len("template search "):].strip() if not query: _send_private(player, "[SUDO-TEMPLATE] Usage: template search ", config, "yellow") return True rows = _search_templates(query, config) if not rows: _send_private(player, "[SUDO-TEMPLATE] No direct template links found.", config, "yellow") _send_private(player, "[SUDO-TEMPLATE] Tip: configure sync sources then run: template sync", config, "dark_aqua") return True _cache_template_search(player, rows) _send_private(player, "[SUDO-TEMPLATE] Top results:", config, "aqua") for i, row in enumerate(rows[:8], 1): _send_private(player, f"{i}) {row['url']}", config, "gray") _send_private(player, "[SUDO-TEMPLATE] Tip: use 'sudo template pick [name]'.", config, "dark_aqua") return True if p.startswith("template pick "): rest = raw[len("template pick "):].strip() if not rest: _send_private(player, "[SUDO-TEMPLATE] Usage: template pick [name]", config, "yellow") return True parts = rest.split() token = parts[0] name_start = 1 if token.lower() in ("option", "result") and len(parts) > 1: token = parts[1] name_start = 2 url = _get_cached_template_url(player, token) if not url: _send_private(player, "[SUDO-TEMPLATE] Invalid index. Run template search first.", config, "yellow") return True name_hint = " ".join(parts[name_start:]).strip() if len(parts) > name_start else "" try: out_path = _download_template(url, name_hint, config) except Exception as first_err: out_path = "" urls = _get_cached_template_urls(player) for fallback_url in urls: if fallback_url == url: continue try: out_path = _download_template(fallback_url, name_hint, config) break except Exception: continue if not out_path: raise first_err _cache_last_download(player, out_path) _send_private(player, f"[SUDO-TEMPLATE] Downloaded from #{token.lstrip('#')}: {out_path}", config, "green") return True if p == "template sync": _send_private(player, "[SUDO-TEMPLATE] Syncing templates...", config, "aqua") res = _template_sync(config) _send_private(player, f"[SUDO-TEMPLATE] Sync complete: ok={res['ok']} failed={res['failed']}", config, "green" if res['failed'] == 0 else "yellow") for err in res.get("errors", [])[:3]: _send_private(player, f"[SUDO-TEMPLATE] {err}", config, "gray") return True if p.startswith("template build"): if not get_server_capabilities(config).get("template_build", False): _send_private(player, "[SUDO-TEMPLATE] template build requires a Paper server with WorldEdit/FAWE.", config, "yellow") return True rest = raw[len("template build"):].strip() src = _resolve_template_file(rest, player, config) if not src: _send_private(player, "[SUDO-TEMPLATE] Template not found. Use template list or download first.", config, "yellow") return True staged_name = _stage_template_for_fawe(src, config) _send_private(player, f"[SUDO-TEMPLATE] Staged for FAWE: {os.path.basename(src)}", config, "aqua") _send_private(player, f"[SUDO-TEMPLATE] Run in chat: //schem load {staged_name}", config, "gray") _send_private(player, "[SUDO-TEMPLATE] Then run: //paste -a", config, "gray") return True cmd = "template download " if p.startswith("template download ") else "template install " rest = raw[len(cmd):].strip() if not rest: _send_private(player, "[SUDO-TEMPLATE] Usage: template download [name]", config, "yellow") return True parts = rest.split() first = parts[0] token = first name_start = 1 if first.lower() in ("option", "result") and len(parts) > 1: token = parts[1] name_start = 2 cached_url = _get_cached_template_url(player, token) url = cached_url or first if cached_url: name_hint = " ".join(parts[name_start:]).strip() if len(parts) > name_start else "" else: name_hint = " ".join(parts[1:]).strip() if len(parts) > 1 else "" out_path = _download_template(url, name_hint, config) _cache_last_download(player, out_path) _send_private(player, f"[SUDO-TEMPLATE] Downloaded: {out_path}", config, "green") return True except Exception as e: log.warning(f"Template sudo command failed: {e}") _send_private(player, f"[SUDO-TEMPLATE] Failed: {e}", config, "red") return True def _memory_path(config) -> str: return config.get( "memory_path", "/opt/mcsmanager/daemon/data/InstanceData/shrinkborder1234567890abcdef12345/aigod_memory.json" ) def save_prayer_memory(config): """Persist prayer memory to disk.""" try: path = _memory_path(config) with _memory_lock: data = list(prayer_memory) with open(path, 'w') as f: json.dump(data, f) log.debug(f"Prayer memory saved ({len(data)} entries)") except Exception as e: log.warning(f"Could not save prayer memory: {e}") def load_prayer_memory(config): """Load prayer memory from disk on startup.""" try: path = _memory_path(config) with open(path) as f: data = json.load(f) with _memory_lock: prayer_memory.clear() for entry in data[-PRAYER_MEMORY_SIZE:]: prayer_memory.append(tuple(entry)) log.info(f"Prayer memory loaded ({len(data)} entries from {path})") except FileNotFoundError: log.info("No prayer memory file found — starting fresh.") except Exception as e: log.warning(f"Could not load prayer memory: {e}") def add_prayer_memory(player: str, prayer: str, god_message: str, config=None): """Record a completed prayer exchange and persist to disk.""" with _memory_lock: prayer_memory.append((player, prayer[:200], god_message[:300])) while len(prayer_memory) > PRAYER_MEMORY_SIZE: prayer_memory.popleft() if config: save_prayer_memory(config) def _first_login_path(config) -> str: return config.get( "first_login_path", "/opt/mcsmanager/daemon/data/InstanceData/shrinkborder1234567890abcdef12345/aigod_first_login_seen.json" ) def load_first_login_seen(config): """Load first-login blessing memory from disk.""" try: path = _first_login_path(config) with open(path) as f: data = json.load(f) with _memory_lock: first_login_seen.clear() for name in data: first_login_seen.add(str(name)) log.info(f"First-login memory loaded ({len(data)} players from {path})") except FileNotFoundError: log.info("No first-login memory file found — starting fresh.") except Exception as e: log.warning(f"Could not load first-login memory: {e}") def save_first_login_seen(config): """Persist first-login blessing memory to disk.""" try: path = _first_login_path(config) with _memory_lock: data = sorted(first_login_seen) with open(path, 'w') as f: json.dump(data, f) except Exception as e: log.warning(f"Could not save first-login memory: {e}") def has_first_login_seen(player: str) -> bool: with _memory_lock: return player in first_login_seen def mark_first_login_seen(player: str, config): with _memory_lock: first_login_seen.add(player) save_first_login_seen(config) def get_log_context_block() -> str: """Return recent server events as a formatted string for the LLM.""" with _memory_lock: entries = list(recent_log) if not entries: return "" now = time.time() lines = [] for ts, entry in entries: mins_ago = int((now - ts) / 60) if mins_ago < 60: time_label = f"{mins_ago}m ago" else: time_label = f"{mins_ago // 60}h {mins_ago % 60}m ago" lines.append(f" [{time_label}] {entry}") return f"\n=== RECENT SERVER EVENTS (last {len(lines)} events, up to {LOG_MAX_HOURS}h) ===\n" + "\n".join(lines) + "\n" def get_prayer_history_messages() -> list: """ Return prayer memory as alternating user/assistant message dicts for insertion into the Ollama messages array before the current prayer. """ with _memory_lock: history = list(prayer_memory) messages = [] for player, prayer, god_msg in history: messages.append({"role": "user", "content": f"{player} prayed: {prayer}"}) messages.append({"role": "assistant", "content": f'{{"message": "{god_msg}", "commands": []}}'}) return messages def add_sudo_history(player: str, prompt: str, translated_cmds: list, executed_cmds: list): """Record a sudo translation/execution so future sudo requests can reference it.""" with _memory_lock: sudo_history.append((time.time(), player, prompt[:220], translated_cmds[:6], executed_cmds[:6])) while len(sudo_history) > SUDO_HISTORY_SIZE: sudo_history.popleft() def get_sudo_history_block() -> str: """Return last N sudo commands/translations as context for translator model.""" with _memory_lock: entries = list(sudo_history) if not entries: return "" now = time.time() lines = [] for ts, player, prompt, translated, executed in entries: mins_ago = int((now - ts) / 60) tlabel = f"{mins_ago}m ago" trans = " | ".join(translated) if translated else "(none)" execd = " | ".join(executed) if executed else "(none)" lines.append( f" [{tlabel}] {player} asked: {prompt}" f"\n translated: {trans}" f"\n executed: {execd}" ) return "\n=== LAST 10 SUDO ACTIONS ===\n" + "\n".join(lines) + "\n" def get_last_sudo_executed_command(player: str) -> str: """Return the most recent executed sudo command for a player, if any.""" with _memory_lock: entries = list(sudo_history) for _, p, _, _, executed in reversed(entries): if p != player: continue if executed: return str(executed[-1]) return "" def add_sudo_failure(player: str, command: str, error: str): """Record a failed sudo command/result so future prompts can avoid repeats.""" c = (command or "").strip()[:220] e = re.sub(r'\s+', ' ', (error or "")).strip()[:220] if not c or not e: return with _memory_lock: sudo_failures.append((time.time(), player, c, e)) while len(sudo_failures) > SUDO_FAILURE_SIZE: sudo_failures.popleft() def get_sudo_failures_block(player: str = "") -> str: """Return recent failed sudo commands as anti-pattern context.""" with _memory_lock: entries = list(sudo_failures) if player: entries = [e for e in entries if e[1] == player] if not entries: return "" now = time.time() lines = [] for ts, p, cmd, err in entries[-8:]: mins = int((now - ts) / 60) lines.append(f" [{mins}m ago] {p}: {cmd} -> {err}") return "\n=== RECENT FAILED SUDO PATTERNS ===\n" + "\n".join(lines) + "\n" def set_last_sudo_feedback(player: str, prompt: str, results_seen: list, ineffective: bool): with _memory_lock: last_sudo_feedback[player] = { "ts": time.time(), "prompt": (prompt or "")[:240], "ineffective": bool(ineffective), "results": [(c[:220], (r or "")[:240]) for c, r in results_seen[:12]], } def get_last_sudo_feedback(player: str) -> Dict[str, Any]: with _memory_lock: return dict(last_sudo_feedback.get(player, {})) def _bug_log_path(config) -> str: return config.get("bug_log_path", "/var/log/mc_aigod_paper_bug.log") def _read_recent_raw_log_lines(log_path: str, max_lines: int) -> list: lines = deque(maxlen=max_lines * 4) noise = re.compile(r'RCON (?:Listener|Client)|Thread RCON Client') try: with open(log_path, 'r', encoding='utf-8', errors='replace') as f: for line in f: clean = line.rstrip('\n') if noise.search(clean): continue lines.append(clean) except Exception as e: return [f""] return list(lines)[-max_lines:] def _service_log_path(config) -> str: return config.get("service_log_path", "/var/log/mc_aigod_paper.log") def _read_recent_service_action_lines(path: str, max_lines: int) -> list: lines = deque(maxlen=max_lines * 8) keep = re.compile( r'Prayer from|SUDO from|Executing RCON:|SUDO execute:|RCON result:|SUDO result:|BUG_LOG from|' r'Gateway god_system|God intervenes unprompted|Blocked tp in unprompted intervention' ) try: with open(path, 'r', encoding='utf-8', errors='replace') as f: for line in f: clean = line.rstrip('\n') if keep.search(clean): lines.append(clean) except Exception as e: return [f""] return list(lines)[-max_lines:] def _read_recent_intervention_lines(path: str, max_lines: int) -> list: """Return focused history of unprompted intervention cycles.""" out = deque(maxlen=max_lines * 6) start = re.compile(r'Gateway god_system|God intervenes unprompted') detail = re.compile( r'Blocked tp in unprompted intervention|Executing RCON:|RCON result:|Next divine intervention' ) window = 0 try: with open(path, 'r', encoding='utf-8', errors='replace') as f: for line in f: clean = line.rstrip('\n') if start.search(clean): out.append(clean) window = 14 continue if window > 0 and detail.search(clean): out.append(clean) window -= 1 elif window > 0: window -= 1 except Exception as e: return [f""] return list(out)[-max_lines:] def _format_recent_event_lines(max_events: int) -> list: with _memory_lock: entries = list(recent_log)[-max_events:] out = [] for ts, entry in entries: stamp = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S') out.append(f"[{stamp}] {entry}") return out def process_bug_log(player: str, description: str, config): desc = (description or "").strip() if not desc: desc = "(no description provided)" event_count = int(config.get("bug_log_event_lines", 40)) raw_count = int(config.get("bug_log_raw_lines", 120)) service_count = int(config.get("bug_log_service_lines", 30)) intervention_count = int(config.get("bug_log_intervention_lines", 80)) recent_events = _format_recent_event_lines(event_count) raw_lines = _read_recent_raw_log_lines(config.get("log_path", ""), raw_count) service_lines = _read_recent_service_action_lines(_service_log_path(config), service_count) intervention_lines = _read_recent_intervention_lines(_service_log_path(config), intervention_count) bug_path = _bug_log_path(config) timestamp = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S UTC') try: parent = os.path.dirname(bug_path) if parent: os.makedirs(parent, exist_ok=True) with open(bug_path, 'a', encoding='utf-8') as bf: bf.write("\n" + "=" * 80 + "\n") bf.write(f"BUG LOG ENTRY: {timestamp}\n") bf.write(f"Player: {player}\n") bf.write(f"Description: {desc}\n") bf.write("\n-- RECENT INTERESTING EVENTS --\n") if recent_events: bf.write("\n".join(recent_events) + "\n") else: bf.write("(no recent in-memory events captured)\n") bf.write("\n-- RECENT AI ACTIONS (SERVICE LOG) --\n") if service_lines: bf.write("\n".join(service_lines) + "\n") else: bf.write("(no AI service lines available)\n") bf.write("\n-- RECENT INTERVENTION CYCLES (SERVICE LOG) --\n") if intervention_lines: bf.write("\n".join(intervention_lines) + "\n") else: bf.write("(no intervention lines available)\n") bf.write("\n-- RECENT RAW SERVER LOG LINES --\n") if raw_lines: bf.write("\n".join(raw_lines) + "\n") else: bf.write("(no raw lines available)\n") log.info(f"BUG_LOG recorded by {player}: {desc} -> {bug_path}") rcon( f'tellraw {player} {{"text":"[BUG_LOG] Logged. Thank you.","color":"green"}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) except Exception as e: log.error(f"BUG_LOG write failed for {player}: {e}", exc_info=True) rcon( f'tellraw {player} {{"text":"[BUG_LOG] Failed to write bug log.","color":"red"}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) # Training audit — separate try block so bug_log success isn't affected try: write_training_feedback(player, desc, config) except Exception as e: log.error(f"Training feedback write failed for {player}: {e}", exc_info=True) # --------------------------------------------------------------------------- # Training Audit Log — structured JSONL for dataset expansion # --------------------------------------------------------------------------- _audit_lock = threading.Lock() def _training_audit_path(config) -> str: return config.get("training_audit_path", "/var/log/mc_training_audit.jsonl") def write_training_audit(player: str, mode: str, user_message: str, commands_generated: list, commands_executed: list, message: str, context: dict, config: dict, rcon_results: list = None): """ Write a structured training example to the audit JSONL. Every pray/sudo interaction becomes a candidate training pair. """ audit_path = _training_audit_path(config) server_ctx = { "server_type": config.get("server_type", "paper"), "version": "1.21.x", "online_players": context.get("online_players", []), } # Add player position if available try: pos = get_player_xyz(player, config) if pos: server_ctx["player_position"] = {"x": pos[0], "y": pos[1], "z": pos[2]} except Exception: pass admin_user = config.get("sudo_user", "slingshooter08") entry = { "timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), "source": "live_playtest", "category": _infer_category(mode, user_message, commands_executed), "mode": mode, "player": player, "player_is_admin": player == admin_user, "input": { "user_message": user_message, "server_context": server_ctx, }, "output": { "commands_generated": commands_generated or [], "commands_executed": commands_executed or [], "message": message or "", }, "rcon_results": rcon_results or [], "needs_review": True, } try: parent = os.path.dirname(audit_path) if parent: os.makedirs(parent, exist_ok=True) with _audit_lock: with open(audit_path, 'a', encoding='utf-8') as f: f.write(json.dumps(entry, ensure_ascii=False) + '\n') except Exception as e: log.error(f"Training audit write failed: {e}") def write_training_feedback(player: str, description: str, config: dict): """ Write a bug_log feedback entry that links to the player's last interaction. Feedback from non-admin players is tagged as unverified — they may have wrong expectations about what should have happened. """ audit_path = _training_audit_path(config) admin_user = config.get("sudo_user", "slingshooter08") is_admin = player == admin_user # Pull the last sudo/prayer context for this player last_sudo = get_last_sudo_feedback(player) last_prayer = None with _memory_lock: for mem in reversed(prayer_memory): if mem.get("player") == player: last_prayer = mem break entry = { "timestamp": datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), "source": "player_feedback", "type": "bug_report", "player": player, "player_is_admin": is_admin, "trust_level": "verified" if is_admin else "unverified", "description": description, "last_sudo_context": { "prompt": last_sudo.get("prompt", ""), "results": [list(r) if isinstance(r, tuple) else r for r in last_sudo.get("results", [])], "ineffective": last_sudo.get("ineffective", False), } if last_sudo else None, "last_prayer_context": { "prayer": last_prayer.get("prayer", ""), "god_message": last_prayer.get("god_message", ""), } if last_prayer else None, "needs_review": True, "reviewer_notes": "" if is_admin else "UNVERIFIED: player may not understand expected behavior", } try: parent = os.path.dirname(audit_path) if parent: os.makedirs(parent, exist_ok=True) with _audit_lock: with open(audit_path, 'a', encoding='utf-8') as f: f.write(json.dumps(entry, ensure_ascii=False) + '\n') except Exception as e: log.error(f"Training feedback write failed: {e}") def _infer_category(mode: str, user_message: str, commands_executed: list) -> str: """Infer dataset category from the interaction.""" low = (user_message or "").lower() if mode == "god_system": return "negative" if not commands_executed: # No commands = either info query, safety refusal, or empty response q_words = ("what ", "how ", "why ", "explain ", "wiki ", "lookup ") if any(low.startswith(w) for w in q_words) or low.endswith("?"): return "info" return "safety" if mode == "god": return "command_gen" # sudo mode troubleshoot_words = ("lag", "can't", "broken", "not working", "won't", "error", "crash", "stuck") if any(w in low for w in troubleshoot_words): return "troubleshoot" return "command_gen" # --------------------------------------------------------------------------- # RCON # --------------------------------------------------------------------------- def rcon(cmd, host='127.0.0.1', port=25575, password='REDACTED_RCON'): try: s = socket.socket() s.settimeout(5) s.connect((host, port)) def pkt(i, t, p): p = p.encode() + b'\x00\x00' return struct.pack('= 3: x, y, z = float(pos_m[0]), float(pos_m[1]), float(pos_m[2]) dist = int((x**2 + z**2) ** 0.5) details["pos"] = f"x={int(x)}, y={int(y)}, z={int(z)} ({dist} blocks from spawn)" # Deaths deaths_raw = q(f"scoreboard players get {player} player_deaths") deaths_m = re.search(r'has\s+(\d+)', deaths_raw) if deaths_m: details["deaths"] = int(deaths_m.group(1)) player_details[player] = details # Server-wide scoreboards total_deaths_raw = q("scoreboard players get $deaths_total Total Deaths") shrink_enabled_raw = q("scoreboard players get $shrink_enabled shrink_enabled") border_parity_raw = q("scoreboard players get $border_parity border_parity") total_deaths_m = re.search(r'has\s+(\d+)', total_deaths_raw) shrink_enabled_m = re.search(r'has\s+(\d+)', shrink_enabled_raw) border_parity_m = re.search(r'has\s+(\d+)', border_parity_raw) scoreboards = { "total_deaths": total_deaths_m.group(1) if total_deaths_m else "unknown", "shrink_enabled": shrink_enabled_m.group(1) if shrink_enabled_m else "unknown", "border_parity": ("N/S" if border_parity_m and border_parity_m.group(1) == "0" else "E/W") if border_parity_m else "unknown", } return { "online_players": online_players, "player_details": player_details, "time_of_day": time_label, "weather": weather, "world_border": border_size, "scoreboards": scoreboards, } # Item tier/rarity knowledge for God's awareness ITEM_RARITY = { # Extremely rare / end-game "netherite_ingot": "extremely rare (end-game)", "netherite_scrap": "extremely rare (end-game)", "ancient_debris": "extremely rare (end-game, nether)", "elytra": "extremely rare (end cities)", "dragon_egg": "unique (one per world)", "nether_star": "extremely rare (wither boss drop)", "beacon": "extremely rare (crafted from nether star)", "enchanted_golden_apple": "extremely rare", "totem_of_undying": "rare (evoker drop)", "trident": "rare (drowned drop)", # Rare / mid-game "diamond": "rare (deep underground)", "diamond_sword": "rare", "diamond_pickaxe": "rare", "diamond_axe": "rare", "diamond_chestplate": "rare", "diamond_helmet": "rare", "diamond_leggings": "rare", "diamond_boots": "rare", "ender_pearl": "uncommon (enderman drop)", "blaze_rod": "uncommon (nether, blaze drop)", "golden_apple": "uncommon", "experience_bottle": "uncommon", # Uncommon / mid-game "iron_ingot": "common (underground)", "iron_sword": "common", "iron_pickaxe": "common", "gold_ingot": "uncommon", "lapis_lazuli": "common (underground)", "emerald": "uncommon (mountains, trading)", "obsidian": "uncommon (requires diamond pickaxe)", "spruce_log": "common in taiga/snowy biomes only — may require travel", "spruce_planks": "common in taiga/snowy biomes only — may require travel", "dark_oak_log": "common in dark oak forests only — may require travel", "dark_oak_planks": "common in dark oak forests only", "jungle_log": "common in jungle biomes only — may require travel", "mangrove_log": "common in mangrove swamps only", "cherry_log": "common in cherry grove biomes only", # Common / early-game "oak_log": "very common (most biomes)", "oak_planks": "very common", "birch_log": "common (birch forests)", "acacia_log": "common (savanna)", "cobblestone": "very common", "dirt": "very common", "sand": "very common (deserts, beaches)", "gravel": "common", "stone": "very common", "coal": "common (underground, surface cliffs)", "torch": "very common (crafted from coal)", "crafting_table": "very common (basic craft)", "furnace": "very common (basic craft)", "white_bed": "common (wool + planks)", "bread": "common (wheat farming)", "cooked_beef": "common (cow farming)", } def parse_inventory(raw: str) -> str: """ Parse RCON 'data get entity Inventory' output into a human-readable summary with rarity annotations for the LLM. """ # Extract item entries: {count: N, id: "minecraft:xxx", ...} items = re.findall(r'\{[^}]+\}', raw) counts: dict = {} for item in items: id_m = re.search(r'id:\s*"minecraft:(\w+)"', item) count_m = re.search(r'count:\s*(\d+)', item) if id_m: item_id = id_m.group(1) count = int(count_m.group(1)) if count_m else 1 counts[item_id] = counts.get(item_id, 0) + count if not counts: return "empty inventory" lines = [] for item_id, count in sorted(counts.items(), key=lambda x: -x[1]): rarity = ITEM_RARITY.get(item_id, "") rarity_str = f" ({rarity})" if rarity else "" lines.append(f" {count}x {item_id}{rarity_str}") return "\n".join(lines) def get_player_context(player: str, config) -> str: """ Fetch player-specific state via RCON and return a formatted block for injection into the LLM user message. """ def q(cmd): return rcon(cmd, config["rcon_host"], config["rcon_port"], config["rcon_password"]) lines = [] # Inventory inv_raw = q(f"data get entity {player} Inventory") inv_summary = parse_inventory(inv_raw) lines.append(f"Inventory:\n{inv_summary}") # Position pos_raw = q(f"data get entity {player} Pos") pos_m = re.findall(r'(-?[\d.]+)d', pos_raw) if pos_m and len(pos_m) >= 3: x, y, z = float(pos_m[0]), float(pos_m[1]), float(pos_m[2]) dist = int((x**2 + z**2) ** 0.5) lines.append(f"Position: x={int(x)}, y={int(y)}, z={int(z)} ({dist} blocks from spawn)") # Health (max 20.0) health_raw = q(f"data get entity {player} Health") health_m = re.search(r'([\d.]+)f', health_raw) if health_m: hp = float(health_m.group(1)) lines.append(f"Health: {hp:.1f}/20.0 ({'critical' if hp < 6 else 'low' if hp < 12 else 'moderate' if hp < 18 else 'full'})") # Food level (max 20) food_raw = q(f"data get entity {player} foodLevel") food_m = re.search(r':\s*(\d+)', food_raw) if food_m: food = int(food_m.group(1)) lines.append(f"Food: {food}/20 ({'starving' if food < 4 else 'hungry' if food < 10 else 'satisfied' if food < 18 else 'full'})") # XP level xp_raw = q(f"data get entity {player} XpLevel") xp_m = re.search(r':\s*(\d+)', xp_raw) if xp_m: lines.append(f"XP level: {xp_m.group(1)}") # Death count from scoreboard score_raw = q(f"scoreboard players get {player} player_deaths") score_m = re.search(r'has\s+(\d+)', score_raw) if score_m: lines.append(f"Deaths this session: {score_m.group(1)}") if not lines: return "" return "\n=== PRAYING PLAYER STATE ===\n" + "\n".join(lines) + "\n" # --------------------------------------------------------------------------- # LLM # --------------------------------------------------------------------------- # --------------------------------------------------------------------------- # Server type capability sets # --------------------------------------------------------------------------- SERVER_CAPABILITIES = { "vanilla": { "safe_prefixes": [ 'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ', 'execute ', 'kill ', 'summon ', 'tellraw ', 'worldborder ', 'gamemode ', ], "sudo_whitelist_note": "give, effect, xp, tp, time, weather, execute, kill, summon, tellraw, worldborder, gamemode", "template_build": False, }, "paper": { "safe_prefixes": [ 'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ', 'execute ', 'kill ', 'summon ', 'tellraw ', 'worldborder ', 'fill ', 'setblock ', 'clone ', 'gamemode ', ], "sudo_whitelist_note": "give, effect, xp, tp, time, weather, execute, kill, summon, tellraw, worldborder, fill, setblock, clone, gamemode", "template_build": True, }, } DEFAULT_SERVER_TYPE = "paper" def get_server_capabilities(config) -> dict: server_type = str(config.get("server_type", DEFAULT_SERVER_TYPE)).lower().strip() if config else DEFAULT_SERVER_TYPE return SERVER_CAPABILITIES.get(server_type, SERVER_CAPABILITIES[DEFAULT_SERVER_TYPE]) COMMAND_PALETTE = """ GIVE (any item, based on player need — see Item Naming Rules below): SYNTAX: give minecraft: The order is ALWAYS: give, then player name, then minecraft:item_id, then count number. CORRECT: give slingshooter08 minecraft:spruce_log 64 INCORRECT: give slingshooter08 64 minecraft:spruce_log <- count must come LAST INCORRECT: give slingshooter08 spruce_log 64 <- namespace prefix required give {target} minecraft:[enchantments={:}] 1 xp add {target} levels EFFECTS (replace {target} with any online player's username): SYNTAX: effect give minecraft: effect give {target} minecraft:regeneration 120 2 effect give {target} minecraft:strength 300 1 effect give {target} minecraft:speed 300 2 effect give {target} minecraft:night_vision 600 1 effect give {target} minecraft:fire_resistance 600 1 effect give {target} minecraft:water_breathing 600 1 effect give {target} minecraft:instant_health 1 4 effect give {target} minecraft:blindness 30 1 effect give {target} minecraft:slowness 60 3 effect give {target} minecraft:weakness 60 2 effect give {target} minecraft:hunger 60 5 effect give {target} minecraft:nausea 20 1 effect give {target} minecraft:levitation 5 3 effect clear {target} GAMEMODE: gamemode survival {target} gamemode creative {target} gamemode adventure {target} gamemode spectator {target} MOVEMENT: tp {target} 0 64 0 tp {target} tp {target} ~ ~10 ~ execute in minecraft:the_nether run tp {target} execute in minecraft:the_end run tp {target} 0 64 0 execute in minecraft:overworld run tp {target} NOTE: To teleport a player to another dimension ALWAYS use: execute in minecraft: run tp NEVER use: tp minecraft:the_nether (this is wrong syntax) WORLD/ENVIRONMENT (affects all players): SYNTAX: weather [duration] NOTE: 'storm' is invalid; if intent says storm/rainstorm/thunderstorm use thunder. time set day time set night weather clear 6000 weather thunder 6000 weather rain 3000 PUNISHMENTS: execute at {target} run summon minecraft:lightning_bolt ~ ~ ~ execute at {target} run summon minecraft:creeper ~ ~ ~3 kill {target} CELEBRATIONS: execute at {target} run summon minecraft:firework_rocket ~ ~1 ~ """ ITEM_LIBRARY = """ === ITEM NAMING RULES === All item IDs use the minecraft: namespace and snake_case. There is no item called "minecraft:bed" — beds are colour-prefixed: white_bed, red_bed, blue_bed, etc. There is no "minecraft:log" — use oak_log, spruce_log, birch_log, etc. There is no "minecraft:wool" — use white_wool, red_wool, etc. There is no "minecraft:dye" — use red_dye, blue_dye, etc. Enchantments use 1.21 component syntax: item[enchantments={sharpness:5,unbreaking:3}] COMMON VALID IDs (not exhaustive — use your knowledge of Minecraft item names): FOOD: bread, cooked_beef, cooked_chicken, golden_apple, enchanted_golden_apple, honey_bottle, cake SURVIVAL: torch, crafting_table, furnace, chest, white_bed, flint_and_steel, compass, map MATERIALS: diamond, emerald, gold_ingot, iron_ingot, netherite_ingot, coal, lapis_lazuli, amethyst_shard TOOLS: diamond_pickaxe, diamond_axe, diamond_shovel, diamond_sword, diamond_hoe, bow, crossbow, trident, fishing_rod, shears ARMOR: diamond_helmet, diamond_chestplate, diamond_leggings, diamond_boots netherite_helmet, netherite_chestplate, netherite_leggings, netherite_boots elytra, shield, turtle_helmet UTILITY: totem_of_undying, experience_bottle, ender_pearl, ender_eye, blaze_rod, name_tag, saddle, lead, clock, spyglass, bundle, recovery_compass BLOCKS: obsidian, crying_obsidian, ancient_debris, cobblestone, stone, dirt, oak_planks, oak_log, glass, bookshelf, ladder, vine POTIONS: potion (requires component syntax for type — prefer effect give instead) """ def build_system_prompt(config): return ( f"You are God in a Minecraft server called {config['server_name']}.\n" "You are benevolent but just. Theatrical, ancient, dramatic, and darkly funny in speech.\n" "You answer every prayer with a message. You pass judgement on players when they pray.\n\n" "Respond ONLY with a valid JSON object — no markdown, no explanation, nothing else:\n" '{\n' ' "message": "Your divine words to all players",\n' ' "commands": ["command1", "command2"]\n' '}\n\n' "Rules:\n" "- message: always present and non-empty. Speak as God. Be dramatic, biblical, and ironic with dry humor. KEEP IT UNDER 100 WORDS. Do not ramble — God is powerful, not verbose.\n" "- commands: list of server commands WITHOUT a leading slash. May be empty [] ONLY if you are deliberately granting nothing.\n" "- CRITICAL: If your message says you will give something, grant something, or fulfil a request, you MUST include the corresponding command in the commands array. Words alone do nothing. The commands array is the ONLY way anything happens in the world. If commands is empty, nothing happens regardless of what your message says.\n" "- {player} is the player who prayed. You may also use any other online player's literal username as a target.\n" "- You are NOT obligated to do what the praying player asked. You may reward someone else,\n" " punish the requester, change the weather, or do something entirely unexpected.\n" "- Use the current server state (time, weather, online players) and the praying player's state (inventory, health, food, position) to inform your judgement.\n" "- Consider what the player actually has and what they realistically need. A player with full diamond gear asking for more is greedy. A starving player with nothing deserves compassion.\n" "- Do not ask a player to gather materials they clearly don't have access to or that are rare relative to their current situation.\n" "- For give commands: use any valid Minecraft 1.21 item ID following the Item Naming Rules. Do not guess item IDs — consult the naming rules and common IDs list.\n" "- For all other commands: only use forms shown in the Command Palette. Do not invent new command types.\n" "- Weather values are only clear, rain, or thunder. If you mean storm/rainstorm/thunderstorm, output thunder.\n" "- Reward humble, genuine prayers. Punish hubris, blasphemy, or naked greed.\n" "- Repeated greedy demands after recent gifts should usually receive correction (rebuke, debuff, or symbolic punishment), not more rewards.\n" "- Powerful rewards (netherite, enchanted_golden_apple, totem) must be rare.\n" "- kill {target} is reserved for extreme blasphemy only.\n" "- Avoid lethal accidents from vertical teleports. If using a high upward teleport as punishment or spectacle, pair it with slow_falling or resistance unless explicit execution is intended.\n" "- When angered, chain commands: thunder + lightning + debuffs = divine wrath.\n\n" "=== COMMAND PALETTE ===\n" f"{COMMAND_PALETTE}\n" "=== ITEM LIBRARY ===\n" f"{ITEM_LIBRARY}" ) def _build_context_block(context, extras=""): border = str(context['world_border']) if context['world_border'] else 'N/A' scoreboards = context.get("scoreboards", {}) shrink = scoreboards.get("shrink_enabled", "unknown") parity = scoreboards.get("border_parity", "unknown") total_deaths = scoreboards.get("total_deaths", "unknown") # Per-player summary player_details = context.get("player_details", {}) player_lines = [] for player in context['online_players']: d = player_details.get(player, {}) pos = d.get("pos", "unknown") deaths = d.get("deaths", "?") player_lines.append(f" {player}: pos={pos}, deaths={deaths}") players_block = "\n".join(player_lines) if player_lines else " none" return ( "\n=== CURRENT SERVER STATE ===\n" f"Time of day: {context['time_of_day']}\n" f"Weather: {context['weather']}\n" f"World border: {border} blocks\n" f"Border shrinking: {'yes' if shrink == '1' else 'no' if shrink == '0' else shrink}\n" f"Next shrink direction: {parity}\n" f"Total deaths (all players, all time): {total_deaths}\n" f"Online players:\n{players_block}\n" f"{extras}" ) def _parse_llm_json(content: str) -> dict: """ Parse LLM JSON response, repairing truncation if necessary. If max_tokens cuts the response mid-string, we attempt to salvage whatever message and commands were already present. """ try: return json.loads(content) except json.JSONDecodeError: log.warning("LLM response truncated — attempting repair") # Extract message if present, even if truncated msg_m = re.search(r'"message"\s*:\s*"((?:[^"\\]|\\.)*)', content) message = msg_m.group(1) if msg_m else "" # Truncate at last complete sentence if mid-sentence for end in ('.', '!', '?'): idx = message.rfind(end) if idx != -1: message = message[:idx+1] break # Extract commands array if present commands = [] cmd_m = re.search(r'"commands"\s*:\s*\[(.*?)(?:\]|$)', content, re.DOTALL) if cmd_m: raw_cmds = cmd_m.group(1) commands = re.findall(r'"([^"]+)"', raw_cmds) # If message was truncated mid-sentence, trim to last complete sentence if message and message[-1] not in '.!?': for end in ('.', '!', '?'): idx = message.rfind(end) if idx != -1: message = message[:idx+1] break result = {"message": message, "commands": commands} log.warning(f"Repaired JSON: message={len(message)}chars, commands={commands}") return result ENCHANTMENT_CONTEXT = """ === ENCHANTMENT RULES FOR GOD === When giving weapons, tools, or armor as a divine gift, you should ALMOST ALWAYS enchant them. Enchanted gifts feel more divine. Unenchanted items are acceptable only as a deliberate choice (e.g. giving basic materials, a punishment of mediocrity, or items that cannot be enchanted). Use 1.21 component syntax: give minecraft:[enchantments={ench1:lvl,ench2:lvl}] 1 MAX ENCHANTMENT REFERENCE (use as baseline for "fully enchanted" or "blessed" gifts): SWORD: netherite_sword[enchantments={sharpness:5,unbreaking:3,looting:3,fire_aspect:2,mending:1,sweeping_edge:3}] PICKAXE: netherite_pickaxe[enchantments={efficiency:5,unbreaking:3,fortune:3,mending:1}] AXE: netherite_axe[enchantments={efficiency:5,unbreaking:3,fortune:3,sharpness:5,mending:1}] SHOVEL: netherite_shovel[enchantments={efficiency:5,unbreaking:3,fortune:3,mending:1}] HOE: netherite_hoe[enchantments={efficiency:5,unbreaking:3,fortune:3,mending:1}] BOW: bow[enchantments={power:5,unbreaking:3,infinity:1,flame:1,punch:2}] CROSSBOW: crossbow[enchantments={multishot:1,quick_charge:3,unbreaking:3,mending:1}] TRIDENT: trident[enchantments={channeling:1,loyalty:3,unbreaking:3,mending:1,impaling:5}] HELMET: netherite_helmet[enchantments={protection:4,unbreaking:3,respiration:3,aqua_affinity:1,thorns:3,mending:1}] CHEST: netherite_chestplate[enchantments={protection:4,unbreaking:3,thorns:3,mending:1}] LEGS: netherite_leggings[enchantments={protection:4,unbreaking:3,swift_sneak:3,mending:1}] BOOTS: netherite_boots[enchantments={protection:4,unbreaking:3,feather_falling:4,depth_strider:3,soul_speed:3,mending:1}] FISHING: fishing_rod[enchantments={luck_of_the_sea:3,lure:3,unbreaking:3,mending:1}] ELYTRA: elytra[enchantments={unbreaking:3,mending:1}] SHIELD: shield[enchantments={unbreaking:3,mending:1}] You do NOT need to always give max enchants — a modest reward may have fewer. But unenchanted weapons/tools/armor from God should be the exception, not the rule. """ # --- God Soul (loaded from file or inline) --- _GOD_SOUL = "" try: with open(os.path.join(os.path.dirname(__file__) or ".", "god_soul.md")) as _f: _GOD_SOUL = _f.read() except FileNotFoundError: try: with open("/etc/god_soul.md") as _f: _GOD_SOUL = _f.read() except FileNotFoundError: pass COMMANDS_SYSTEM_PROMPT = ( "You are God in a Minecraft server. Given a player's prayer and server context, " "decide what server commands to run (if any) as an act of divine judgment.\n\n" "Respond ONLY with a valid JSON object, nothing else:\n" "{\"commands\": [\"cmd1\", \"cmd2\"]}\n\n" + (_GOD_SOUL + "\n\n" if _GOD_SOUL else "") + "SYNTAX RULES:\n" "- {player} = the praying player. You may target any other online player by name.\n" "- For give: syntax is always give minecraft: \n" "- Count comes LAST. Namespace prefix minecraft: is REQUIRED.\n" "- For effects: use 'effect give minecraft: '.\n" "- For weather use only clear/rain/thunder (NOT storm).\n" "- Do not use tp unless the player explicitly requests movement/teleportation.\n" "- Beds: white_bed not bed. Logs: oak_log not log. Wool: white_wool not wool.\n" "- Chain commands for dramatic effect: thunder + lightning + blindness = wrath.\n" "- Commands are ALWAYS in English Minecraft syntax regardless of prayer language.\n\n" + "=== COMMAND PALETTE ===\n" + COMMAND_PALETTE + "\n=== ITEM LIBRARY ===\n" + ITEM_LIBRARY + ENCHANTMENT_CONTEXT ) def build_sudo_commands_system_prompt(config=None) -> str: caps = get_server_capabilities(config) if config else SERVER_CAPABILITIES[DEFAULT_SERVER_TYPE] whitelist = caps["sudo_whitelist_note"] return ( "You are a Minecraft command translator. Convert a player's natural-language request into " "Minecraft server commands. You do NOT roleplay.\n\n" "Respond ONLY with valid JSON:\n" "{\"commands\": [\"cmd1\", \"cmd2\"]}\n\n" "Rules:\n" f"- Use commands from this whitelist only: {whitelist}.\n" "- You may also output meta-commands starting with 'template ' for schematic workflows (not RCON).\n" "- If the request cannot be mapped safely, return commands: [].\n" "- If player says 'me' or 'my', target the requesting player.\n" "- Do not broaden scope to @a/@e unless the user explicitly asks for all players/entities.\n" "- You will receive LAST 10 SUDO ACTIONS. Use them for continuity and corrections when the player says previous output was wrong.\n" "- You will also receive RECENT FAILED SUDO PATTERNS. Do not repeat those broken shapes.\n" "- For give syntax: give minecraft: (count LAST, namespace required)\n" "- For effect syntax: effect give minecraft: [hideParticles]\n" "- For summon tnt: summon minecraft:tnt (NO trailing count number).\n" " If quantity is requested, output multiple summon commands.\n" "- Never use old NBT enchant syntax like {Enchantments:[...]}; use item[enchantments={...}] only.\n" "- Return commands only. No commentary.\n" "- For build requests, prefer template workflow in one response when possible:\n" " template search \n" " template pick [name]\n" " template build \n" "- Keep template workflow concise (2-4 commands max).\n" "\n" "=== TELEPORT SYNTAX ===\n" "Same dimension: tp \n" "Relative: tp ~ ~10 ~\n" "To Nether: execute in minecraft:the_nether run tp \n" "To End: execute in minecraft:the_end run tp 64 \n" "To Overworld: execute in minecraft:overworld run tp \n" "WRONG (never do this): tp minecraft:the_nether\n" "When dimension is unspecified, use a sensible default spawn coord for that dimension.\n" "\n" "=== FULLY ENCHANTED (max enchantments per item type, 1.21 syntax) ===\n" "Use item[enchantments={...}] syntax. Count is always 1 for enchanted items.\n" "\n" "SWORD (netherite_sword):\n" " give

minecraft:netherite_sword[enchantments={sharpness:5,unbreaking:3,looting:3,fire_aspect:2,mending:1,sweeping_edge:3}] 1\n" "\n" "PICKAXE (netherite_pickaxe):\n" " give

minecraft:netherite_pickaxe[enchantments={efficiency:5,unbreaking:3,fortune:3,mending:1}] 1\n" "\n" "AXE (netherite_axe):\n" " give

minecraft:netherite_axe[enchantments={efficiency:5,unbreaking:3,fortune:3,sharpness:5,mending:1}] 1\n" "\n" "SHOVEL (netherite_shovel):\n" " give

minecraft:netherite_shovel[enchantments={efficiency:5,unbreaking:3,fortune:3,mending:1}] 1\n" "\n" "HOE (netherite_hoe):\n" " give

minecraft:netherite_hoe[enchantments={efficiency:5,unbreaking:3,fortune:3,mending:1}] 1\n" "\n" "BOW:\n" " give

minecraft:bow[enchantments={power:5,unbreaking:3,infinity:1,flame:1,punch:2}] 1\n" "\n" "CROSSBOW:\n" " give

minecraft:crossbow[enchantments={multishot:1,quick_charge:3,unbreaking:3,mending:1}] 1\n" "\n" "TRIDENT:\n" " give

minecraft:trident[enchantments={channeling:1,loyalty:3,unbreaking:3,mending:1,impaling:5}] 1\n" "\n" "HELMET (netherite_helmet):\n" " give

minecraft:netherite_helmet[enchantments={protection:4,unbreaking:3,respiration:3,aqua_affinity:1,thorns:3,mending:1}] 1\n" "\n" "CHESTPLATE (netherite_chestplate):\n" " give

minecraft:netherite_chestplate[enchantments={protection:4,unbreaking:3,thorns:3,mending:1}] 1\n" "\n" "LEGGINGS (netherite_leggings):\n" " give

minecraft:netherite_leggings[enchantments={protection:4,unbreaking:3,swift_sneak:3,mending:1}] 1\n" "\n" "BOOTS (netherite_boots):\n" " give

minecraft:netherite_boots[enchantments={protection:4,unbreaking:3,feather_falling:4,depth_strider:3,soul_speed:3,mending:1}] 1\n" "\n" "FISHING ROD:\n" " give

minecraft:fishing_rod[enchantments={luck_of_the_sea:3,lure:3,unbreaking:3,mending:1}] 1\n" "\n" "ELYTRA:\n" " give

minecraft:elytra[enchantments={unbreaking:3,mending:1}] 1\n" "\n" "SHIELD:\n" " give

minecraft:shield[enchantments={unbreaking:3,mending:1}] 1\n" "\n" "When player asks for 'fully enchanted', 'max enchanted', 'best', 'godlike' gear — use the above templates.\n" ) FIRST_LOGIN_BENEVOLENCE_PROMPT = ( "You are generating FIRST-LOGIN benevolence actions for a Minecraft server.\n" "This is a celebratory blessing event when a player joins for the first time.\n\n" "Respond ONLY with valid JSON:\n" "{\"commands\": [\"cmd1\", \"cmd2\", \"cmd3\"]}\n\n" "Rules:\n" "- MUST output MULTIPLE commands (at least 2).\n" "- Actions should benefit the joining player directly or indirectly.\n" "- You may include dramatic/world flavor (daylight, fireworks, clear weather).\n" "- Avoid cruelty in this mode. This is benevolence mode.\n" "- If you include kill commands against players, kill at most one player total.\n" "- For give syntax: give minecraft: (count LAST).\n" "- Use only whitelisted command families: give, effect, xp, tp, time, weather, execute, kill, summon, tellraw, worldborder.\n" ) # Keep backward-compatible alias using default (paper) config SUDO_COMMANDS_SYSTEM_PROMPT = build_sudo_commands_system_prompt() def build_message_system_prompt(config) -> str: base = ( "You are God in a Minecraft server. Write a single spoken message to all players.\n" "You will be told what action was taken (if any) in response to a player's prayer.\n" "Respond with ONLY the message text — no JSON, no quotes, no formatting.\n" "Match the language the player prayed in. If they prayed in Spanish, respond in Spanish.\n" "KEEP IT SHORT — 1-2 sentences max. This appears in Minecraft chat which has limited space.\n" "Be punchy and dramatic, not verbose. Think Old Testament telegram.\n\n" ) if _GOD_SOUL: base += "Your identity and voice are defined by your soul:\n" + _GOD_SOUL + "\n\n" else: base += ( "You are benevolent but just. Theatrical, ancient, dramatic, and laced with dry irony " "— like the Old Testament with a sharper wit.\n" "Be vivid and dramatic, with occasional godlike sarcasm and irony.\n" "For punishments, prefer fear and humiliation over accidental instant death.\n" ) lore = config.get("god_lore", "") if lore: base += f"\n=== SERVER LORE ===\n{lore}\n" return base def _llm_call(model: str, system: str, user: str, config: dict, fmt = None, temperature: float = 0.85, max_tokens: int = 400, timeout: int = 60) -> str: """LLM call — routes based on config. Dev server cascades: Haiku → Gemini → Ollama fallback.""" provider = config.get("llm_provider", "ollama") if provider == "anthropic": haiku_budget = config.get("anthropic_budget", 20.00) gemini_budget = config.get("gemini_budget", 20.00) # Stage 1: Haiku until budget exhausted if _get_anthropic_cost() < haiku_budget: return _anthropic_call(model, system, user, config, temperature, max_tokens, timeout) # Stage 2: Gemini until its budget exhausted if _get_gemini_cost() < gemini_budget: log.info("Haiku budget exhausted, using Gemini Flash Lite") return _gemini_call(system, user, config, temperature, max_tokens, timeout) # Stage 3: Fall back to local Ollama model log.info("All API budgets exhausted, falling back to Ollama") fallback = config.get("fallback_model", "qwen3-8b-mc-lora-v3") payload = { "model": fallback, "messages": [{"role": "system", "content": system}, {"role": "user", "content": user}], "stream": False, "options": {"temperature": temperature, "num_predict": max_tokens}, } r = requests.post(f"{config['ollama_url']}/api/chat", json=payload, timeout=timeout) r.raise_for_status() return r.json()["message"]["content"] # Default: Ollama (prod servers use this path) payload = { "model": model, "messages": [ {"role": "system", "content": system}, {"role": "user", "content": user}, ], "stream": False, "options": { "temperature": temperature, "num_predict": max_tokens, }, } if fmt: payload["format"] = fmt r = requests.post(f"{config['ollama_url']}/api/chat", json=payload, timeout=timeout) r.raise_for_status() return r.json()["message"]["content"] # --- Gemini API cost tracking and call --- _gemini_cost_lock = threading.Lock() _gemini_cost_file = "/var/log/mc_gemini_cost.json" def _load_gemini_cost(): try: with open(_gemini_cost_file) as f: return json.load(f).get("total_cost", 0.0) except: return 0.0 _gemini_total_cost = _load_gemini_cost() def _save_gemini_cost(): try: with open(_gemini_cost_file, "w") as f: json.dump({"total_cost": _gemini_total_cost, "updated": time.strftime("%Y-%m-%dT%H:%M:%SZ")}, f) except: pass def _get_gemini_cost(): with _gemini_cost_lock: return _gemini_total_cost def _gemini_call(system: str, user: str, config: dict, temperature: float = 0.85, max_tokens: int = 400, timeout: int = 60) -> str: """Call Gemini Flash Lite API. Tracks cost.""" global _gemini_total_cost api_key = config.get("gemini_api_key", "REDACTED_GEMINI_KEY_2") model = config.get("gemini_model", "gemini-2.5-flash-lite") budget = config.get("gemini_budget", 20.00) url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}" body = { "contents": [{"parts": [{"text": f"SYSTEM: {system}\n\nUSER: {user}"}]}], "generationConfig": { "temperature": temperature, "maxOutputTokens": max_tokens, }, } r = requests.post(url, json=body, timeout=timeout) r.raise_for_status() data = r.json() text = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "") # Estimate cost (Gemini Flash Lite: $0.075/M input, $0.30/M output) usage = data.get("usageMetadata", {}) input_tokens = usage.get("promptTokenCount", 500) output_tokens = usage.get("candidatesTokenCount", 150) cost = (input_tokens / 1_000_000) * 0.075 + (output_tokens / 1_000_000) * 0.30 with _gemini_cost_lock: prev_dollar = int(_gemini_total_cost) _gemini_total_cost += cost _save_gemini_cost() curr_dollar = int(_gemini_total_cost) if curr_dollar > prev_dollar: log.info(f"Gemini cost milestone: ${_gemini_total_cost:.4f} / ${budget:.2f}") return text # --- Anthropic API cost tracking --- _anthropic_cost_lock = threading.Lock() _anthropic_cost_file = "/var/log/mc_anthropic_cost.json" def _load_anthropic_cost(): try: with open(_anthropic_cost_file) as f: return json.load(f).get("total_cost", 0.0) except: return 0.0 _anthropic_total_cost = _load_anthropic_cost() def _save_anthropic_cost(): try: with open(_anthropic_cost_file, "w") as f: json.dump({"total_cost": _anthropic_total_cost, "updated": time.strftime("%Y-%m-%dT%H:%M:%SZ")}, f) except: pass def _get_anthropic_cost(): with _anthropic_cost_lock: return _anthropic_total_cost def _anthropic_call(model: str, system: str, user: str, config: dict, temperature: float = 0.85, max_tokens: int = 400, timeout: int = 60) -> str: """Call Anthropic Claude API. Tracks cost and enforces budget.""" global _anthropic_total_cost api_key = config.get("anthropic_api_key", "") budget = config.get("anthropic_budget", 5.00) headers = { "x-api-key": api_key, "anthropic-version": "2023-06-01", "content-type": "application/json", } body = { "model": model, "max_tokens": max_tokens, "system": system, "messages": [{"role": "user", "content": user}], "temperature": temperature, } r = requests.post("https://api.anthropic.com/v1/messages", headers=headers, json=body, timeout=timeout) r.raise_for_status() data = r.json() text = data["content"][0]["text"] input_tokens = data["usage"]["input_tokens"] output_tokens = data["usage"]["output_tokens"] # Track cost (Haiku pricing) cost = (input_tokens / 1_000_000) * 0.80 + (output_tokens / 1_000_000) * 4.00 with _anthropic_cost_lock: prev_dollar = int(_anthropic_total_cost) _anthropic_total_cost += cost _save_anthropic_cost() curr_dollar = int(_anthropic_total_cost) if curr_dollar > prev_dollar: log.info(f"Anthropic cost milestone: ${_anthropic_total_cost:.4f} / ${budget:.2f}") # Print full status report to POS printer try: import socket as _sock import subprocess as _sp from escpos.printer import Dummy as _Dummy _p = _Dummy(profile="default") _cols = 57 _p.set(font='b', align='center', bold=True, height=2) _p.text("MC AI TRAINING\n") _p.set(font='b', align='center', bold=True, height=1) _p.text("STATUS REPORT\n") _p.set(font='b', align='center', bold=False) _p.text(time.strftime("%Y-%m-%d %H:%M") + "\n") _p.text("=" * _cols + "\n") # Cost _p.set(font='b', align='left', bold=True) _p.text("CLAUDE HAIKU API\n") _p.set(font='b', align='left', bold=True) _p.text(f" Spent: ${_anthropic_total_cost:.4f}\n") _p.set(font='b', align='left', bold=False) _p.text(f" Budget: ${budget:.2f}\n") _p.text(f" Remaining: ${budget - _anthropic_total_cost:.4f}\n") _p.text("-" * _cols + "\n") # Audit log counts try: def _wc(path): try: with open(path) as _f: return sum(1 for _ in _f) except: return 0 _dev = _wc("/var/log/mc_training_audit_dev.jsonl") _prod = _wc("/var/log/mc_training_audit.jsonl") _shrink = _wc("/var/log/mc_training_audit_shrink.jsonl") _p.set(font='b', align='left', bold=True) _p.text("TRAINING DATA\n") _p.set(font='b', align='left', bold=False) _p.text(f" Dev audit: {_dev}\n") _p.text(f" Prod audit: {_prod}\n") _p.text(f" Shrink audit: {_shrink}\n") _p.text(f" Total pending: {_dev + _prod + _shrink}\n") _p.text("-" * _cols + "\n") except: pass # Services try: _p.set(font='b', align='left', bold=True) _p.text("SERVICES\n") _p.set(font='b', align='left', bold=False) for _svc in ["mc-aigod-paper", "mc-aigod-dev", "mc-aigod"]: try: _r = _sp.run(["systemctl", "is-active", f"{_svc}.service"], capture_output=True, text=True, timeout=3) _st = "OK" if _r.stdout.strip() == "active" else "DOWN" except: _st = "?" _p.text(f" {_svc:24} [{_st}]\n") _p.text("-" * _cols + "\n") except: pass _p.set(font='b', align='center', bold=False) _p.text(f"${curr_dollar} milestone\n") _p.text("=" * _cols + "\n") _p.cut() with _sock.create_connection(("192.168.0.137", 9100), timeout=5) as _s: _s.sendall(_p.output) except Exception as _pe: log.warning(f"POS print failed: {_pe}") return text def _gateway_enabled(config) -> bool: return bool(config.get("use_langgraph_gateway", False)) def _gateway_key(player: str, mode: str) -> str: return f"{mode}:{player}" def _gateway_actor(player: str, mode: str, config) -> str: shared_modes = set(str(m).strip().lower() for m in config.get("gateway_shared_mode_sessions", []) if str(m).strip()) if mode.lower() in shared_modes: return "__shared__" return player def _gateway_start_session(player: str, mode: str, config) -> str: actor = _gateway_actor(player, mode, config) key = _gateway_key(actor, mode) with _gateway_lock: sid = _gateway_sessions.get(key) if sid: return sid url = config.get("langgraph_gateway_url", "http://127.0.0.1:8091") timeout = int(config.get("langgraph_gateway_timeout", 45)) r = requests.post( f"{url}/v1/session/start", json={"player": actor, "mode": mode}, timeout=timeout, ) r.raise_for_status() sid = r.json()["session_id"] with _gateway_lock: _gateway_sessions[key] = sid return sid def _gateway_send(player: str, mode: str, text: str, context_payload: dict, config, allow_tools: bool = True, max_tool_steps: int = 4) -> dict: """Call session gateway and return {message, commands, tool_trace}.""" sid = _gateway_start_session(player, mode, config) url = config.get("langgraph_gateway_url", "http://127.0.0.1:8091") timeout = int(config.get("langgraph_gateway_timeout", 45)) payload = { "role": "user", "text": text, "context": context_payload, "allow_tools": allow_tools, "max_tool_steps": max_tool_steps, } try: r = requests.post(f"{url}/v1/session/{sid}/message", json=payload, timeout=timeout) r.raise_for_status() data = r.json() return { "message": data.get("message"), "commands": data.get("commands") or [], "tool_trace": data.get("tool_trace") or [], } except Exception: # Session might be expired/reset in gateway. Retry once with fresh session. actor = _gateway_actor(player, mode, config) with _gateway_lock: _gateway_sessions.pop(_gateway_key(actor, mode), None) sid = _gateway_start_session(player, mode, config) r = requests.post(f"{url}/v1/session/{sid}/message", json=payload, timeout=timeout) r.raise_for_status() data = r.json() return { "message": data.get("message"), "commands": data.get("commands") or [], "tool_trace": data.get("tool_trace") or [], } def _build_prayer_context(player, prayer, context, config) -> str: """Build the full user message block shared by both calls.""" try: player_ctx = get_player_context(player, config) except Exception as e: log.warning(f"Could not fetch player context for {player}: {e}") player_ctx = "" others = [p for p in context["online_players"] if p != player] ctx = _build_context_block( context, extras=( f"Other targetable players: {', '.join(others) or 'none'}\n" + player_ctx + get_log_context_block() ) ) return f"{player} prays: {prayer}{ctx}" def ask_god(player, prayer, context, config): """ Two-call approach: 1. command_model (qwen3-coder:30b) decides what commands to run — pure JSON, no prose. 2. model (gemma3:12b) writes the message — pure prose, no JSON, no token competition. """ command_model = config.get("command_model", config["model"]) message_model = config["model"] history = get_prayer_history_messages() user_msg = _build_prayer_context(player, prayer, context, config) # Optional session gateway path if _gateway_enabled(config): try: player_state = get_player_context(player, config) except Exception: player_state = "" payload = { "server_state": context, "player_state": player_state, "recent_events": get_log_context_block(), "history_count": len(history) // 2, "mode": "god", } out = _gateway_send( player=player, mode="god", text=f"pray {prayer}", context_payload=payload, config=config, allow_tools=bool(config.get("gateway_allow_tools_god", True)), max_tool_steps=int(config.get("gateway_max_tool_steps", 4)), ) log.info(f"Gateway god tool_trace={out.get('tool_trace', [])}") return {"message": out.get("message"), "commands": out.get("commands") or []} # --- Call 1: commands --- log.info(f"Commands call ({command_model}) — {player}: {prayer[:60]} (history={len(history)//2})") try: cmd_content = _llm_call( model=command_model, system=COMMANDS_SYSTEM_PROMPT, user=user_msg, config=config, fmt="json", temperature=0.3, # low temp for precise structured output max_tokens=200, ) cmd_result = _parse_llm_json(cmd_content) commands = cmd_result.get("commands") or [] log.info(f"Commands decided: {commands}") except Exception as e: log.error(f"Commands call failed: {e}") commands = [] # --- Call 2: message --- # Tell the message model what was decided so it can write accordingly if commands: action_summary = f"You decided to execute these server commands: {commands}" else: action_summary = "You decided to take no action." msg_user = ( f"{user_msg}\n\n" f"=== YOUR DECISION ===\n{action_summary}\n" f"Now write your spoken message to all players." ) # Include prayer history so God's voice is consistent msg_messages = ( [{"role": "system", "content": build_message_system_prompt(config)}] + history + [{"role": "user", "content": msg_user}] ) log.info(f"Message call ({message_model})") try: msg_payload = { "model": message_model, "messages": msg_messages, "stream": False, "options": { "temperature": config.get("temperature", 0.9), "num_predict": config.get("max_tokens", 600), }, } r = requests.post(f"{config['ollama_url']}/api/chat", json=msg_payload, timeout=60) r.raise_for_status() message = r.json()["message"]["content"].strip() log.info(f"Message: {message[:200]}") except Exception as e: log.error(f"Message call failed: {e}") message = "" try: with open('/var/log/mc_aigod_paper_responses.log', 'a') as rf: rf.write( f"\n--- {time.strftime('%Y-%m-%d %H:%M:%S')} prayer:{player} ---\n" f"COMMANDS: {commands}\n" f"MESSAGE: {message}\n" ) except Exception: pass return {"message": message, "commands": commands} INTERVENTION_PROMPT = ( "=== DIVINE MOMENT ===\n" "No player has prayed. You are simply watching over your world.\n" "You may choose to act upon what you see, or remain silent.\n" "If commands is [], take no action and set message to null.\n" "Do not feel obligated to act — restraint is also divine.\n" "If you do act, it may be subtle (weather, soft blessing) or dramatic.\n" ) def ask_god_intervention(context, config): """Two-call intervention: commands first, then message.""" command_model = config.get("command_model", config["model"]) message_model = config["model"] ctx = _build_context_block(context, extras=get_log_context_block()) user_msg = INTERVENTION_PROMPT + ctx if _gateway_enabled(config): out = _gateway_send( player="__system__", mode="god_system", text="intervention event", context_payload={ "server_state": context, "recent_events": get_log_context_block(), "mode": "god_system", }, config=config, allow_tools=bool(config.get("gateway_allow_tools_system", False)), max_tool_steps=int(config.get("gateway_max_tool_steps", 4)), ) log.info(f"Gateway god_system tool_trace={out.get('tool_trace', [])}") return {"message": out.get("message"), "commands": out.get("commands") or []} # --- Call 1: commands --- log.info(f"Intervention commands call ({command_model})") try: cmd_content = _llm_call( model=command_model, system=COMMANDS_SYSTEM_PROMPT, user=user_msg, config=config, fmt="json", temperature=0.3, max_tokens=200, ) commands = (_parse_llm_json(cmd_content) or {}).get("commands") or [] log.info(f"Intervention commands: {commands}") except Exception as e: log.error(f"Intervention commands call failed: {e}") commands = [] if not commands: log.info("God chose silence (no commands).") return {"message": None, "commands": []} # --- Call 2: message --- action_summary = f"You decided to execute: {commands}" msg_user = f"{user_msg}\n\n=== YOUR DECISION ===\n{action_summary}\nNow write your spoken message." log.info(f"Intervention message call ({message_model})") try: message = _llm_call( model=message_model, system=build_message_system_prompt(config), user=msg_user, config=config, fmt=None, temperature=0.9, max_tokens=config.get("max_tokens", 600), ).strip() except Exception as e: log.error(f"Intervention message call failed: {e}") message = "" return {"message": message, "commands": commands} # --------------------------------------------------------------------------- # Command validation & execution # --------------------------------------------------------------------------- SAFE_PREFIXES = [ 'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ', 'execute ', 'kill ', 'summon ', 'tellraw ', 'worldborder ', 'fill ', 'setblock ', 'clone ', ] _border_cache = {"ts": 0.0, "half": None} def _tp_border_guard_enabled(config) -> bool: return bool(config.get("tp_border_guard_enabled", True)) def _parse_abs_coord(tok: str): tok = (tok or "").strip() if not tok or tok.startswith("~") or tok.startswith("^"): return None try: return float(tok) except Exception: return None def _extract_tp_abs_xyz(cmd: str): for m in re.finditer(r'\btp\s+\S+\s+(\S+)\s+(\S+)\s+(\S+)', cmd): x = _parse_abs_coord(m.group(1)) y = _parse_abs_coord(m.group(2)) z = _parse_abs_coord(m.group(3)) if x is None or y is None or z is None: continue return x, y, z return None def _worldborder_half_extent(config): now = time.time() if now - float(_border_cache.get("ts", 0.0)) < 10.0 and _border_cache.get("half") is not None: return float(_border_cache["half"]) try: raw = rcon("worldborder get", config["rcon_host"], config["rcon_port"], config["rcon_password"]) nums = re.findall(r'(-?[\d.]+)', raw or "") if not nums: return None width = float(nums[0]) half = max(0.0, width / 2.0) _border_cache["ts"] = now _border_cache["half"] = half return half except Exception: return None _OTHER_DIMENSION_RE = re.compile( r'\bexecute\s+in\s+minecraft:(the_nether|the_end|nether|end)\b', re.IGNORECASE ) def _tp_inside_worldborder(cmd: str, config) -> bool: if not _tp_border_guard_enabled(config): return True # Nether/End dimension teleports use different coordinate spaces — skip border check. if _OTHER_DIMENSION_RE.search(cmd): return True xyz = _extract_tp_abs_xyz(cmd) if not xyz: return True half = _worldborder_half_extent(config) if half is None: return True x, _, z = xyz cx = float(config.get("worldborder_center_x", 0.0)) cz = float(config.get("worldborder_center_z", 0.0)) margin = max(0.0, float(config.get("tp_border_margin", 2.0))) limit = max(0.0, half - margin) return abs(x - cx) <= limit and abs(z - cz) <= limit def _tp_safety_enabled(config) -> bool: return bool(config.get("tp_safety_enabled", True)) def _extract_tp_target_y(cmd: str): m = re.search(r'\btp\s+(\S+)\s+(\S+)\s+(\S+)\s+(\S+)', cmd) if not m: return None, None return m.group(1), m.group(3) def _needs_vertical_tp_safety(resolved_cmd: str, config) -> tuple: target, ytok = _extract_tp_target_y(resolved_cmd) if not target or not ytok: return False, "" risky_delta = float(config.get("tp_safety_vertical_delta", 8.0)) risky_abs_y = float(config.get("tp_safety_absolute_y", 120.0)) if ytok.startswith("~"): offs = ytok[1:].strip() dy = float(offs) if offs else 0.0 if dy >= risky_delta: return True, target return False, target try: abs_y = float(ytok) if abs_y >= risky_abs_y: return True, target except Exception: pass return False, target def _tp_safety_prefix_commands(resolved_cmd: str, config) -> list: if not _tp_safety_enabled(config): return [] needs, target = _needs_vertical_tp_safety(resolved_cmd, config) if not needs or not target: return [] return [ f"effect give {target} minecraft:slow_falling 20 0", f"effect give {target} minecraft:resistance 8 1", ] # --------------------------------------------------------------------------- # Fix 1: @s → player name in RCON context # --------------------------------------------------------------------------- def fix_at_s_selector(cmd: str, fallback_player: str) -> str: """Replace @s with the requesting player's name. RCON has no executor entity, so @s always fails with 'No entity was found'.""" if '@s' not in cmd: return cmd fixed = cmd.replace('@s', fallback_player) if fixed != cmd: log.warning(f"Fixed @s selector: '{cmd}' -> '{fixed}'") return fixed # --------------------------------------------------------------------------- # Fix 2: Old NBT enchantment → 1.21 component syntax # --------------------------------------------------------------------------- def fix_nbt_enchantment_syntax(cmd: str) -> str: """Convert old NBT {Enchantments:[{id:...,lvl:...}]} and {Enchantments:{...}} to 1.21 component syntax item[enchantments={name:level}].""" if 'Enchantments' not in cmd and 'enchantments' not in cmd: return cmd # Pattern 1: item{Enchantments:[{id:"name",lvl:N}, ...]} m = re.match(r'^(give\s+\S+\s+)(minecraft:\w+)\{Enchantments:\[(.+?)\]\}(.*)$', cmd, re.I) if m: pre, item, body, rest = m.groups() enchants = {} for eid, lvl in re.findall(r'id:[\"\']?(?:minecraft:)?([a-z_]+)[\"\']?\s*,\s*lvl[\"\']?:?\s*[\"\']?(\d+)', body): enchants[eid] = lvl if enchants: ench_str = ','.join(f'{k}:{v}' for k, v in enchants.items()) fixed = f"{pre}{item}[enchantments={{{ench_str}}}]{rest}" log.warning(f"Fixed NBT enchant (list): '{cmd}' -> '{fixed}'") return fixed # Pattern 2: item{Enchantments:{minecraft:name:N, ...}} (non-standard) m = re.match(r'^(give\s+\S+\s+)(?:minecraft:)?(\w+)\{Enchantments:\{(.+?)\}\}(.*)$', cmd, re.I) if m: pre, item, body, rest = m.groups() enchants = {} for eid, lvl in re.findall(r'(?:minecraft:)?([a-z_]+):(\d+)', body): enchants[eid] = lvl if enchants: ench_str = ','.join(f'{k}:{v}' for k, v in enchants.items()) fixed = f"{pre}minecraft:{item}[enchantments={{{ench_str}}}]{rest}" log.warning(f"Fixed NBT enchant (dict): '{cmd}' -> '{fixed}'") return fixed return cmd # --------------------------------------------------------------------------- # Fix 3: Strip invalid item components # --------------------------------------------------------------------------- _INVALID_COMPONENTS = re.compile( r',?\s*(?:display|durability|enc|Durability|Display|Lore|CustomModelData|HideFlags' r'|Paper|Unbreakable|Displayname|CustomName)(?::\{[^}]*\}|=[^,\]}\s]+|:[^,\]}\s]+)', re.I, ) def fix_invalid_item_components(cmd: str) -> str: """Strip unrecognized item components from give commands. Removes display:{...}, durability=N, enc=N, etc.""" if not cmd.startswith('give ') or '[' not in cmd: return cmd # Find the component section: item[...] m = re.search(r'(\[.+?\])', cmd) if not m: return cmd bracket = m.group(1) cleaned = _INVALID_COMPONENTS.sub('', bracket) # Clean up dangling commas cleaned = re.sub(r'\{,', '{', cleaned) cleaned = re.sub(r',\}', '}', cleaned) cleaned = re.sub(r',\]', ']', cleaned) if cleaned != bracket: # Remove empty brackets entirely if cleaned in ('[]', '[,]'): cleaned = '' fixed = cmd[:m.start()] + cleaned + cmd[m.end():] log.warning(f"Stripped invalid components: '{cmd}' -> '{fixed}'") return fixed return cmd # --------------------------------------------------------------------------- # Fix 4: Hallucinated effect/item validation with fuzzy matching # --------------------------------------------------------------------------- VALID_EFFECTS = { 'speed', 'slowness', 'haste', 'mining_fatigue', 'strength', 'instant_health', 'instant_damage', 'jump_boost', 'nausea', 'regeneration', 'resistance', 'fire_resistance', 'water_breathing', 'invisibility', 'blindness', 'night_vision', 'hunger', 'weakness', 'poison', 'wither', 'health_boost', 'absorption', 'saturation', 'glowing', 'levitation', 'luck', 'unluck', 'slow_falling', 'conduit_power', 'dolphins_grace', 'bad_omen', 'hero_of_the_village', 'darkness', 'trial_omen', 'raid_omen', 'wind_charged', 'weaving', 'oozing', 'infested', } EFFECT_ALIASES = { 'invincibility': 'resistance', 'invulnerability': 'resistance', 'invulnerable': 'resistance', 'invincible': 'resistance', 'experience_boost': 'luck', 'xp_boost': 'luck', 'slow_speed': 'slowness', 'slow': 'slowness', 'fast': 'speed', 'swift': 'speed', 'quick': 'speed', 'do_not_breed': 'weakness', 'fire': 'fire_resistance', 'healing': 'instant_health', 'heal': 'instant_health', 'damage': 'instant_damage', 'harm': 'instant_damage', 'swim': 'dolphins_grace', 'swimming': 'dolphins_grace', 'flying': 'levitation', 'float': 'slow_falling', } def fix_hallucinated_effect(cmd: str) -> str: """Replace hallucinated effect names with the closest valid effect.""" m = re.match(r'^(effect\s+give\s+\S+\s+minecraft:)(\w+)(.*)$', cmd) if not m: return cmd pre, effect, rest = m.groups() if effect in VALID_EFFECTS: return cmd replacement = EFFECT_ALIASES.get(effect) if replacement: fixed = f"{pre}{replacement}{rest}" log.warning(f"Fixed hallucinated effect: '{effect}' -> '{replacement}' in '{cmd}'") return fixed # No known alias — log but don't block (let RCON report the error for training data) log.warning(f"Unknown effect '{effect}' in: {cmd}") return cmd # --------------------------------------------------------------------------- # Fix 5: Hallucinated command repair # --------------------------------------------------------------------------- COMMAND_ALIASES = { r'^setworldborder\s+(\d+)': lambda m: f"worldborder set {m.group(1)}", r'^setspawn\b': lambda m: f"spawnpoint", r'^heal\s+(.+)': lambda m: f"effect give {m.group(1)} minecraft:instant_health 1 1", r'^difficulty set\s+(\w+)': lambda m: f"difficulty {m.group(1)}", r'^commands?\b': lambda m: None, # drop the garbage 'commands' token r'^gamerule\s+timeSpeed\s+': lambda m: None, # not a real gamerule } def fix_hallucinated_command(cmd: str) -> str: """Rewrite commonly hallucinated commands to their real equivalents.""" for pattern, fixer in COMMAND_ALIASES.items(): m = re.match(pattern, cmd, re.I) if m: result = fixer(m) if result is None: log.warning(f"Dropped hallucinated command: '{cmd}'") return "" log.warning(f"Fixed hallucinated command: '{cmd}' -> '{result}'") return result return cmd # --------------------------------------------------------------------------- # Existing fix functions # --------------------------------------------------------------------------- def fix_give_command(cmd: str) -> str: """ Correct common LLM give command mistakes: - Wrong argument order: give → give minecraft: - Missing namespace: give → give minecraft: """ # Only attempt to fix give commands m = re.match(r'^give\s+(\S+)\s+(\S+)\s+(\S+)(.*)$', cmd) if not m: return cmd player, arg2, arg3, rest = m.group(1), m.group(2), m.group(3), m.group(4) def normalize_item(item: str) -> str: # Strip namespace for alias mapping, then re-apply raw = item.replace("minecraft:", "") aliases = { "wood": "oak_log", "logs": "oak_log", "log": "oak_log", "door": "oak_door", "doors": "oak_door", "wooden_door": "oak_door", "planks": "oak_planks", "plank": "oak_planks", "food": "bread", "heal": "golden_apple", "healing": "golden_apple", "bed": "white_bed", } raw = aliases.get(raw, raw) return f"minecraft:{raw}" # Detect transposed order: give player if arg2.isdigit(): count, item = arg2, arg3 item = normalize_item(item) fixed = f"give {player} {item} {count}{rest}" log.warning(f"Fixed transposed give: '{cmd}' -> '{fixed}'") return fixed # Detect missing namespace: give player if not arg2.startswith("{"): item = normalize_item(arg2) fixed = f"give {player} {item} {arg3}{rest}" log.warning(f"Fixed missing namespace: '{cmd}' -> '{fixed}'") return fixed return cmd def fix_effect_command(cmd: str) -> str: """ Correct common malformed effect syntax: - effect -> effect give """ m = re.match(r'^effect\s+(\S+)\s+(minecraft:\w+)\s+(\d+)\s+(\d+)(?:\s+(true|false))?$', cmd) if m: player, eff, dur, amp, hide = m.groups() fixed = f"effect give {player} {eff} {dur} {amp}" if hide in ("true", "false"): fixed += f" {hide}" log.warning(f"Fixed malformed effect: '{cmd}' -> '{fixed}'") return fixed return cmd def fix_gamemode_command(cmd: str, fallback_player: str) -> str: """ Normalize common gamemode variants: - gameMode -> gamemode - short aliases (s/c/a/sp) -> survival/creative/adventure/spectator - add target player when omitted """ raw = (cmd or "").strip() m = re.match(r'^(?:gameMode|gamemode)\s+(\S+)(?:\s+(\w+))?$', raw, flags=re.IGNORECASE) if not m: return cmd mode, target = m.group(1).lower(), (m.group(2) or "").strip() alias = { "s": "survival", "0": "survival", "c": "creative", "1": "creative", "a": "adventure", "2": "adventure", "sp": "spectator", "3": "spectator", } mode = alias.get(mode, mode) if mode not in ("survival", "creative", "adventure", "spectator"): return cmd if not target: target = fallback_player fixed = f"gamemode {mode} {target}" if fixed != raw: log.warning(f"Fixed gamemode syntax: '{cmd}' -> '{fixed}'") return fixed def fix_weather_command(cmd: str) -> str: """Normalize weather synonyms to valid Minecraft literals.""" raw = (cmd or "").strip() fixed = re.sub(r'\bweather\s+(storm|rainstorm|thunderstorm)\b', 'weather thunder', raw, flags=re.IGNORECASE) if fixed != raw: log.warning(f"Fixed weather syntax: '{cmd}' -> '{fixed}'") return fixed def fix_fill_fire_command(cmd: str) -> str: """Fix legacy fill syntax like `... fire 0 replace air` for 1.21.""" raw = (cmd or "").strip() m = re.match( r'^(fill\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+\S+\s+)(minecraft:)?(fire|soul_fire)\s+0\s+replace\s+(\S+)$', raw, flags=re.IGNORECASE, ) if not m: return raw prefix, ns, block, repl = m.groups() block_id = f"minecraft:{block.lower()}" if not ns else f"{ns.lower()}{block.lower()}" fixed = f"{prefix}{block_id} replace {repl.lower()}" log.warning(f"Fixed fill fire syntax: '{cmd}' -> '{fixed}'") return fixed def _split_execute_tail(cmd: str): """Return (wrappers, tail) for execute chains.""" tail = (cmd or "").strip() wrappers = [] for _ in range(6): if not tail.startswith("execute "): break marker = " run " idx = tail.find(marker) if idx < 0: break wrappers.append(tail[: idx + len(marker)]) tail = tail[idx + len(marker):].strip() return wrappers, tail def fix_bow_enchant_syntax(cmd: str) -> str: """Rewrite old bow Enchantments NBT to 1.21 component format.""" raw = (cmd or "").strip() if "Enchantments:[" not in raw or "bow{" not in raw: return raw m = re.match(r'^(give\s+\S+\s+)(minecraft:)?bow\{Enchantments:\[(.+)\]\}\s+(\d+)$', raw) if not m: return raw pre, _, body, count = m.groups() ench = {} for eid, lvl in re.findall(r'id:?\"?([a-z_]+)\"?,\s*lvl:([0-9]+)s?', body): ench[eid] = lvl if not ench: return raw ench_part = ",".join(f"{k}:{v}" for k, v in ench.items()) fixed = f"{pre}minecraft:bow[enchantments={{{ench_part}}}] {count}" log.warning(f"Fixed bow enchant syntax: '{cmd}' -> '{fixed}'") return fixed def _repair_execute_tail(cmd: str, fallback_player: str) -> str: """Apply syntax repairers to `execute ... run ` payloads.""" raw = (cmd or "").strip() tail = raw stack = [] # Peel execute wrappers up to a small depth. for _ in range(4): if not tail.startswith("execute "): break marker = " run " idx = tail.find(marker) if idx < 0: break stack.append(tail[: idx + len(marker)]) tail = tail[idx + len(marker):].strip() fixed_tail = tail fixed_tail = fix_hallucinated_command(fixed_tail) if not fixed_tail: return "" fixed_tail = fix_at_s_selector(fixed_tail, fallback_player) fixed_tail = fix_nbt_enchantment_syntax(fixed_tail) fixed_tail = fix_invalid_item_components(fixed_tail) fixed_tail = fix_hallucinated_effect(fixed_tail) fixed_tail = fix_give_command(fixed_tail) fixed_tail = fix_effect_command(fixed_tail) fixed_tail = fix_gamemode_command(fixed_tail, fallback_player) fixed_tail = fix_weather_command(fixed_tail) fixed_tail = fix_fill_fire_command(fixed_tail) fixed_tail = fix_bow_enchant_syntax(fixed_tail) if stack: rebuilt = "".join(stack) + fixed_tail if rebuilt != raw: log.warning(f"Fixed execute-tail syntax: '{raw}' -> '{rebuilt}'") return rebuilt return raw def validate_command(cmd, online_players, fallback_player, config=None): """Replace placeholders, auto-fix common give syntax errors, check safe prefix.""" resolved = cmd.replace("{player}", fallback_player).replace("{target}", fallback_player) resolved = resolved.strip() if resolved.startswith("/"): resolved = resolved[1:] # Fix 5: hallucinated commands (must run first — may drop or rewrite entire command) resolved = fix_hallucinated_command(resolved) if not resolved: return "", False # Fix 1: @s → player name (RCON has no executor entity) resolved = fix_at_s_selector(resolved, fallback_player) # Fix 2: old NBT enchantment → 1.21 component syntax resolved = fix_nbt_enchantment_syntax(resolved) # Fix 3: strip invalid item components (display, durability, enc, etc.) resolved = fix_invalid_item_components(resolved) # Fix 4: hallucinated effect names → closest valid effect resolved = fix_hallucinated_effect(resolved) # Existing fixes resolved = fix_give_command(resolved) resolved = fix_effect_command(resolved) resolved = fix_gamemode_command(resolved, fallback_player) resolved = fix_weather_command(resolved) resolved = fix_fill_fire_command(resolved) resolved = fix_bow_enchant_syntax(resolved) resolved = _repair_execute_tail(resolved, fallback_player) caps = get_server_capabilities(config) if config else SERVER_CAPABILITIES[DEFAULT_SERVER_TYPE] prefixes = caps["safe_prefixes"] if not any(resolved.startswith(p) for p in prefixes): log.warning(f"Command blocked (unknown prefix for server_type={config.get('server_type', DEFAULT_SERVER_TYPE) if config else DEFAULT_SERVER_TYPE}): {resolved}") return resolved, False if resolved.startswith("tellraw "): m = re.match(r'^tellraw\s+\S+\s+(.+)$', resolved) if not m: log.warning(f"Command blocked (malformed tellraw): {resolved}") return resolved, False payload = m.group(1).strip() if not (payload.startswith("{") or payload.startswith("[")): log.warning(f"Command blocked (tellraw payload not json): {resolved}") return resolved, False try: json.loads(payload) except Exception: log.warning(f"Command blocked (invalid tellraw json): {resolved}") return resolved, False # Prevent execute-wrapper bypass (e.g. execute ... run gameMode s) if resolved.startswith("execute "): tail = resolved for _ in range(4): if not tail.startswith("execute "): break marker = " run " idx = tail.find(marker) if idx < 0: break tail = tail[idx + len(marker):].strip() if tail and not tail.startswith("execute "): inner_prefixes = [p for p in prefixes if p != "execute "] if not any(tail.startswith(p) for p in inner_prefixes): log.warning(f"Command blocked (unsafe execute tail): {resolved}") return resolved, False if config and _extract_tp_abs_xyz(resolved) and not _tp_inside_worldborder(resolved, config): log.warning(f"Command blocked (tp outside worldborder): {resolved}") return resolved, False return resolved, True def _is_destructive_intent(prompt: str) -> bool: p = (prompt or "").lower() keys = ( "destroy", "destruction", "nuke", "blow up", "blowup", "explode", "erase", "delete", "annihilate", "wreck", "flatten", "ruin", "tnt", ) return any(k in p for k in keys) def _is_fire_intent(prompt: str) -> bool: p = (prompt or "").lower() if "tnt" in p: return False return any(k in p for k in ("fire", "ignite", "burn", "flame")) def _normalize_sudo_command_shape(cmd: str, player: str) -> str: c = (cmd or "").strip() if not c: return c # Collapse execute-wrapped gamemode into direct targeted form. gm = re.match(rf'^execute\s+as\s+{re.escape(player)}\s+run\s+(?:gameMode|gamemode)\s+(\S+)(?:\s+(\w+))?$', c, flags=re.IGNORECASE) if gm: mode = gm.group(1) target = gm.group(2) or player fixed = f"gamemode {mode} {target}" log.warning(f"Normalized execute-wrapped gamemode: '{c}' -> '{fixed}'") return fixed # Prefer position-context execution; this yields more reliable behavior than nested execute chains. m = re.match(rf'^execute\s+as\s+{re.escape(player)}\s+run\s+execute\s+positioned\s+~\s+~\s+~\s+run\s+(.+)$', c) if m: fixed = f"execute at {player} run {m.group(1).strip()}" log.warning(f"Normalized nested execute: '{c}' -> '{fixed}'") return fixed m = re.match(rf'^execute\s+as\s+{re.escape(player)}\s+run\s+(.+)$', c) if m: fixed = f"execute at {player} run {m.group(1).strip()}" log.warning(f"Normalized execute anchor: '{c}' -> '{fixed}'") return fixed # Drop known no-op clone shape (source to same destination). if re.match(r'^clone\s+~\S*\s+~\S*\s+~\S*\s+~\S*\s+~\S*\s+~\S*\s+~\s+~\s+~$', c): log.warning(f"Dropped likely no-op clone: '{c}'") return "" return c def _build_destructive_fallback(player: str, config) -> list: pos = get_player_xyz(player, config) if not pos: return [f"give {player} minecraft:tnt 64", f"give {player} minecraft:flint_and_steel 1"] x, y, z = pos radius = int(config.get("sudo_destroy_radius", 12)) depth = int(config.get("sudo_destroy_depth", 18)) height = int(config.get("sudo_destroy_height", 6)) y1 = max(-64, y - depth) y2 = min(319, y + height) return [ f"fill {x-radius} {y1} {z-radius} {x+radius} {y2} {z+radius} minecraft:air", f"summon minecraft:tnt {x} {min(319, y+2)} {z}", f"summon minecraft:tnt {x+3} {min(319, y+2)} {z}", f"summon minecraft:tnt {x-3} {min(319, y+2)} {z}", ] def _sudo_result_is_effective(result: str) -> bool: r = (result or "").strip().lower() if not r: return False bad_markers = ( "unknown", "incorrect argument", "expected", "syntax error", "no entity was found", "cannot", "failed", "error", ) if any(m in r for m in bad_markers): return False good_markers = ( "successfully", "summoned", "set the", "set block", "filled", "teleported", "gave", "applied effect", "killed", ) return any(m in r for m in good_markers) def _report_sudo_feedback(player: str, prompt: str, translated: list, results_seen: list, ineffective: bool, config): if not _gateway_enabled(config): return try: summary_rows = [] for cmd, res in results_seen[:8]: summary_rows.append({"command": cmd, "result": (res or "")[:220]}) _gateway_send( player=player, mode="sudo", text=f"execution feedback for sudo request: {prompt}", context_payload={ "mode": "sudo_feedback", "feedback_only": True, "request": prompt, "translated_commands": translated[:8], "execution_results": summary_rows, "ineffective": bool(ineffective), }, config=config, allow_tools=False, max_tool_steps=0, ) except Exception as e: log.warning(f"Could not report sudo feedback to gateway: {e}") def _repair_failed_sudo_commands(player: str, results_seen: list, config) -> list: """Build bounded retry commands from observed command/result failures.""" out = [] max_retry = int(config.get("sudo_retry_max_commands", 10)) for cmd, result in results_seen: c = (cmd or "").strip() r = (result or "").lower() if not c or not r: continue # Fix malformed TNT count usage: # execute at

run summon tnt ~ ~1 ~ 80 -> multiple summon commands. if "expected compound tag" in r and "summon" in c and "tnt" in c: prefix = "" body = c m_pref = re.match(rf'^(execute\s+at\s+{re.escape(player)}\s+run\s+)(.+)$', c) if m_pref: prefix = m_pref.group(1) body = m_pref.group(2) m = re.match(r'^summon\s+(?:minecraft:)?tnt\s+(\S+)\s+(\S+)\s+(\S+)(?:\s+(\d+))?$', body) if m: x, y, z, cnt = m.groups() count = int(cnt) if cnt and cnt.isdigit() else 1 count = max(1, min(count, int(config.get("sudo_tnt_retry_cap", 24)))) for i in range(count): dx = (i % 6) - 3 dz = (i // 6) - 2 xx = x if x.startswith("~") else str(int(float(x)) + dx) zz = z if z.startswith("~") else str(int(float(z)) + dz) if x.startswith("~") and dx != 0: xx = f"~{dx}" if dx < 0 else f"~{dx}" if z.startswith("~") and dz != 0: zz = f"~{dz}" if dz < 0 else f"~{dz}" out.append(f"{prefix}summon minecraft:tnt {xx} {y} {zz}") if len(out) >= max_retry: return out # Fix nonexistent invulnerability effect to a real durable protection set. if "mob_effect" in r and "invulnerability" in r: out.extend([ f"effect give {player} minecraft:resistance 1200 4 true", f"effect give {player} minecraft:regeneration 1200 2 true", f"effect give {player} minecraft:absorption 1200 4 true", ]) if len(out) >= max_retry: return out[:max_retry] # Fire fill repair: remove legacy metadata and simplify execution anchor. if "incorrect argument" in r and "fill" in c and "fire" in c: w, tail = _split_execute_tail(c) tail = fix_fill_fire_command(tail) out.append(f"execute at {player} run {tail}") if len(out) >= max_retry: return out[:max_retry] # Empty result for large fire fill: retry with tighter vertical band. if (not r.strip()) and ("fill" in c and "fire" in c): out.append(f"execute at {player} run fill ~-25 ~-1 ~-25 ~25 ~3 ~25 minecraft:fire replace air") if len(out) >= max_retry: return out[:max_retry] return out[:max_retry] def _expand_tnt_commands_from_prompt(commands: list, prompt: str, player: str, config) -> list: """If user asked for many TNT and model returned too few summons, expand boundedly.""" p = (prompt or "").lower() if "tnt" not in p: return commands nums = [int(n) for n in re.findall(r'\b(\d{1,3})\b', p)] if not nums: return commands requested = max(nums) cap = int(config.get("sudo_tnt_max_commands", 80)) target = max(1, min(requested, cap)) if len(commands) >= target: return commands first = None wrappers = [] x = y = z = None for c in commands: w, tail = _split_execute_tail(c) m = re.match(r'^summon\s+(?:minecraft:)?tnt\s+(\S+)\s+(\S+)\s+(\S+)(?:\s+\{.*\})?$', tail) if not m: continue first = c wrappers = w x, y, z = m.groups() break if not first: return commands x = x or "~" y = y or "~1" z = z or "~" expanded = [] for i in range(target): dx = (i % 9) - 4 dz = (i // 9) - 4 xx = x zz = z if x.startswith("~"): xx = "~" if dx == 0 else f"~{dx}" if z.startswith("~"): zz = "~" if dz == 0 else f"~{dz}" tail = f"summon minecraft:tnt {xx} {y} {zz}" cmd = "".join(wrappers) + tail if wrappers else tail expanded.append(cmd) log.warning(f"Expanded TNT commands from {len(commands)} to {len(expanded)} (requested={requested}, cap={cap})") return expanded def execute_response(response, context, config, praying_player=None): message = response.get("message") or "" commands = response.get("commands") or [] # --- DEBUG_COMMANDS toggle --- # Set "debug_commands": true in /etc/mc_aigod.json to show commands in-game. # Uses tellraw (never appears in server logs). Set to false to disable silently. debug = config.get("debug_commands", False) prefix = config.get("god_chat_prefix", "[GOD]") if message: safe_msg = message.replace("\\", "\\\\").replace('"', '\\"').replace("\n", " ").replace("\r", "") # Split on sentence boundaries first, then chunk anything still too long sentences = re.split(r'(?<=[.!?])\s+', safe_msg) lines = [] current = "" for sentence in sentences: if len(current) + len(sentence) + 1 <= 180: current = (current + " " + sentence).strip() else: if current: lines.append(current) # If a single sentence is still too long, hard-chunk it while len(sentence) > 180: lines.append(sentence[:180]) sentence = sentence[180:] current = sentence if current: lines.append(current) for i, line in enumerate(lines): if i == 0: rcon( f'tellraw @a {{"text":"{prefix} {line}","color":"gold","bold":false}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) else: rcon( f'tellraw @a [{{"text":" ","color":"gold"}},{{"text":"{line}","color":"yellow","italic":true}}]', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) time.sleep(0.2) # Flash title on the praying player's screen if praying_player and (commands or message): try: rcon( f'title {praying_player} times 5 40 15', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) rcon( f'title {praying_player} title {{"text":"Your prayers have been answered!","color":"gold","bold":true}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) except Exception: pass fallback = praying_player or (context["online_players"][0] if context["online_players"] else "") max_cmds = config.get("max_commands_per_response", 6) if debug and commands: safe_cmds = " | ".join(commands[:max_cmds]).replace("\\", "\\\\").replace('"', '\\"') rcon( f'tellraw @a {{"text":"[~] {safe_cmds}","color":"dark_gray","italic":true}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) for cmd in commands[:max_cmds]: resolved, is_safe = validate_command(cmd, context["online_players"], fallback, config) if not is_safe: continue # Prevent unsolicited teleporting in unprompted interventions. if praying_player is None and re.search(r'\btp\b', resolved): log.warning(f"Blocked tp in unprompted intervention: {resolved}") continue safety_prefix = _tp_safety_prefix_commands(resolved, config) for scmd in safety_prefix: sresolved, safe_ok = validate_command(scmd, context["online_players"], fallback, config) if not safe_ok: continue log.info(f"Executing RCON: {sresolved}") sresult = rcon(sresolved, config["rcon_host"], config["rcon_port"], config["rcon_password"]) log.info(f"RCON result: {sresult!r}") time.sleep(0.15) log.info(f"Executing RCON: {resolved}") result = rcon(resolved, config["rcon_host"], config["rcon_port"], config["rcon_password"]) log.info(f"RCON result: {result!r}") if resolved.startswith("weather "): if "thunder" in resolved: config["_weather_state"] = "thunderstorm" elif "rain" in resolved: config["_weather_state"] = "rain" elif "clear" in resolved: config["_weather_state"] = "clear" time.sleep(0.3) def _limit_player_kills(commands: list, online_players: list) -> list: """Allow at most one player-kill command in a command list.""" out = [] player_kills = 0 for cmd in commands: m = re.search(r'\bkill\s+(\w+)\b', cmd) if m and m.group(1) in online_players: if player_kills >= 1: continue player_kills += 1 out.append(cmd) return out def process_first_login_benevolence(player, config): """ On a player's first observed login, perform a random benevolent act. Uses command model for actions and message model for flavor text. """ if not config.get("first_login_benevolence_enabled", True): return if has_first_login_seen(player): return # Mark first to avoid duplicate firing on reconnect storms mark_first_login_seen(player, config) try: context = get_server_context(config) except Exception as e: log.warning(f"First-login benevolence: could not fetch context: {e}") context = { "online_players": [player], "player_details": {}, "time_of_day": "unknown", "weather": "unknown", "world_border": None, "scoreboards": {}, } user_msg = ( f"Player first login event: {player}\n" + _build_context_block(context, extras=get_log_context_block()) + f"\nGenerate a benevolent first-login blessing for {player}." ) command_model = config.get("command_model", config["model"]) message_model = config.get("model") message = "" if _gateway_enabled(config): out = _gateway_send( player=player, mode="god_system", text=f"first login benevolence event for {player}", context_payload={ "server_state": context, "recent_events": get_log_context_block(), "event": "first_login_benevolence", "target_player": player, }, config=config, allow_tools=bool(config.get("gateway_allow_tools_system", False)), max_tool_steps=int(config.get("gateway_max_tool_steps", 4)), ) commands = out.get("commands") or [] message = out.get("message") or "" else: try: cmd_content = _llm_call( model=command_model, system=FIRST_LOGIN_BENEVOLENCE_PROMPT, user=user_msg, config=config, fmt="json", temperature=0.4, max_tokens=220, ) parsed = _parse_llm_json(cmd_content) commands = (parsed.get("commands") or []) except Exception as e: log.error(f"First-login benevolence commands call failed: {e}") commands = [] commands = _limit_player_kills(commands, context.get("online_players", [])) # Ensure there are multiple beneficial commands even if model under-produces if len(commands) < 2: commands = [ f"effect give {player} minecraft:regeneration 120 1", f"give {player} minecraft:bread 16", f"execute at {player} run summon minecraft:firework_rocket ~ ~1 ~", ] # Optional message (local path only if gateway did not already provide one) if not _gateway_enabled(config): try: msg_user = ( f"First login blessing for {player}.\n" f"Commands chosen: {commands}\n" "Write a benevolent divine proclamation to all players." ) message = _llm_call( model=message_model, system=build_message_system_prompt(config), user=msg_user, config=config, fmt=None, temperature=0.85, max_tokens=min(220, int(config.get("max_tokens", 600))), ).strip() except Exception: message = f"A blessing descends upon {player} for their first steps in this world." elif not message: message = f"A blessing descends upon {player} for their first steps in this world." max_cmds = int(config.get("first_login_benevolence_max_commands", 10)) execute_response( {"message": message, "commands": commands[:max_cmds]}, context, config, praying_player=player, ) log.info(f"First-login benevolence executed for {player}: {commands[:max_cmds]}") def process_sudo(player, prompt, config): """ sudo translator mode: - no God persona - no speech generation - translates natural language to whitelisted commands - only authorized user can execute """ if not config.get("sudo_enabled", True): return sudo_user = config.get("sudo_user", "slingshooter08") allow_all = config.get("sudo_allow_all_players", False) if player != sudo_user and not allow_all: # Keep this private and quiet rcon( f'tellraw {player} {{"text":"[SUDO] Unauthorized.","color":"red"}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) return # Immediate private ack rcon( f'tellraw {player} {{"text":"[SUDO] Translating...","color":"gray","italic":true}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) # Deterministic template manager (search/download/install) if process_sudo_template_command(player, prompt, config): return def _looks_like_lookup_question(text: str) -> bool: t = (text or "").strip().lower() if not t: return False if t.endswith("?"): return True q_starts = ( "what ", "why ", "how ", "did ", "does ", "is ", "are ", "can ", "could ", "should ", "would ", "where ", "when ", ) if t.startswith(q_starts): return True if re.search(r'\b(that command|last command|did that|does that)\b', t): return True return False def _local_lookup_fallback_answer(query: str, ref_cmd: str, last_feedback: Dict[str, Any]) -> str: q = (query or "").lower() rc = (ref_cmd or "").lower() if re.search(r'\bdid that command do what i asked\b', q): if not last_feedback: return "I do not have enough recent execution context to verify that yet." if bool(last_feedback.get("ineffective", False)): return "Likely no. Recent execution results indicate the command was ineffective or partially failed." return "Likely yes. Recent execution results indicate the command completed successfully." if "invisible" in q and "mob" in q and "invisibility" in rc: return "Invisibility greatly reduces mob detection, but it does not make you perfectly undetectable at close range or while making noise/actions." return "" # Deterministic lookup mode: information only, no command execution. low = prompt.lower().strip() # Deterministic status-check shortcut for common follow-up wording. if re.search(r'\bdid that command do what i asked\b', low): fb = get_last_sudo_feedback(player) if not fb: _send_private(player, "[SUDO-LOOKUP] No recent sudo execution context to evaluate.", config, "yellow") return if bool(fb.get("ineffective", False)): _send_private(player, "[SUDO-LOOKUP] Likely no — recent execution looked ineffective.", config, "yellow") else: _send_private(player, "[SUDO-LOOKUP] Likely yes — recent execution looked successful.", config, "green") return lookup_prefixes = ("lookup ", "search ", "wiki ", "explain ", "what is ", "how do i ", "how to ") if low.startswith(lookup_prefixes) or _looks_like_lookup_question(prompt): query = prompt.strip() if low.startswith("lookup "): query = prompt[len("lookup "):].strip() elif low.startswith("search "): query = prompt[len("search "):].strip() elif low.startswith("wiki "): query = prompt[len("wiki "):].strip() if not query: _send_private(player, "[SUDO-LOOKUP] Usage: sudo lookup ", config, "yellow") return # Resolve contextual references like "that command" using sudo memory. last_cmd = get_last_sudo_executed_command(player) lookup_query = query if last_cmd and re.search(r'\b(that|it|same|again|last command)\b', query.lower()): lookup_query = f"{query} (reference command: {last_cmd})" try: _send_private(player, "[SUDO-LOOKUP] Result:", config, "aqua") if last_cmd: _send_private(player, f"context command: {last_cmd}", config, "dark_gray") last_fb = get_last_sudo_feedback(player) wiki_rows = _info_lookup_wiki(lookup_query) web_rows = _info_lookup_web(lookup_query) gateway_msg = "" if wiki_rows: _send_private(player, "minecraft.wiki:", config, "dark_aqua") for row in wiki_rows[:3]: _send_private(player, f"- {row}", config, "gray") if web_rows: _send_private(player, "web.search:", config, "dark_aqua") for row in web_rows[:3]: _send_private(player, f"- {row}", config, "gray") # Optional model-assisted explanation/justification. if _gateway_enabled(config): out = _gateway_send( player=player, mode="sudo", text=f"lookup only: {query}", context_payload={ "mode": "sudo_lookup", "lookup_only": True, "query": lookup_query, "reference_command": last_cmd, "sudo_history": get_sudo_history_block(), "wiki_hits": wiki_rows, "web_hits": web_rows, }, config=config, allow_tools=bool(config.get("gateway_allow_tools_sudo", True)), max_tool_steps=int(config.get("gateway_max_tool_steps", 4)), ) msg = (out.get("message") or "").strip() trace = out.get("tool_trace") or [] if msg: gateway_msg = msg _send_private(player, "justification:", config, "dark_aqua") for ln in re.split(r"\n+", msg)[:3]: ln = ln.strip() if ln: _send_private(player, f"- {ln[:250]}", config, "gray") if trace: _send_private(player, "sources used:", config, "dark_aqua") for t in trace[:3]: tool = str(t.get("tool", "tool")) q = str(t.get("input", ""))[:80] _send_private(player, f"- {tool}: {q}", config, "dark_gray") if not wiki_rows and not web_rows and not gateway_msg: fb = _local_lookup_fallback_answer(lookup_query, last_cmd, last_fb) if fb: _send_private(player, f"- {fb}", config, "gray") else: _send_private(player, "No lookup results found.", config, "yellow") except Exception as e: log.warning(f"SUDO lookup failed for query={lookup_query!r}: {e}") _send_private(player, f"[SUDO-LOOKUP] Failed: {e}", config, "red") return online = players_online(config) # Legacy deterministic builder templates are optional; default is AI-driven build planning. if bool(config.get("sudo_deterministic_build_templates", False)): templated = build_template_commands(player, prompt, config) if templated is not None: commands = templated[: int(config.get("sudo_max_commands", 12))] safe_preview = " | ".join(commands).replace("\\", "\\\\").replace('"', '\\"') rcon( f'tellraw {player} {{"text":"[SUDO-BUILD] {safe_preview}","color":"dark_aqua","italic":true}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) executed = [] for cmd in commands: resolved, is_safe = validate_command(cmd, online, player, config) if not is_safe: continue log.info(f"SUDO-BUILD execute: {resolved}") result = rcon(resolved, config["rcon_host"], config["rcon_port"], config["rcon_password"]) log.info(f"SUDO-BUILD result: {result!r}") executed.append(resolved) time.sleep(0.15) add_sudo_history(player, prompt, commands, executed) return # Collect positions for all online players. position_lines = [] for p in online: try: pos_raw = rcon( f"data get entity {p} Pos", config["rcon_host"], config["rcon_port"], config["rcon_password"] ) pos_m = re.findall(r'(-?[\d.]+)d', pos_raw) if pos_m and len(pos_m) >= 3: x, y, z = int(float(pos_m[0])), int(float(pos_m[1])), int(float(pos_m[2])) position_lines.append(f" {p}: x={x}, y={y}, z={z}") except Exception: pass positions_block = ( "Player positions:\n" + "\n".join(position_lines) if position_lines else "" ) context_hint = ( f"Requesting player: {player}\n" f"Online players: {', '.join(online) or 'none'}\n" + "Template workflow commands available: template search ; template pick [name] ; template build \n" + (positions_block + "\n" if positions_block else "") + f"Natural language request: {prompt}\n" + get_sudo_history_block() + get_sudo_failures_block(player) ) command_model = config.get("command_model", config["model"]) def _local_sudo_translate() -> list: content = _llm_call( model=command_model, system=build_sudo_commands_system_prompt(config), user=context_hint, config=config, fmt="json", temperature=0.1, max_tokens=180, ) parsed = _parse_llm_json(content) return parsed.get("commands") or [] def _ai_template_plan() -> list: planner_system = ( "You are a Minecraft template workflow planner. Return ONLY JSON: {\"commands\": [\"...\"]}.\n" "Output only template workflow meta-commands (not RCON):\n" "- template search \n" "- template pick [name]\n" "- template build \n" "Keep to 2-4 commands and preserve user intent." ) content = _llm_call( model=command_model, system=planner_system, user=( f"Player: {player}\n" f"Request: {prompt}\n" f"Recent sudo history:\n{get_sudo_history_block()}" ), config=config, fmt="json", temperature=0.2, max_tokens=140, ) parsed = _parse_llm_json(content) cmds = parsed.get("commands") or [] return [c for c in cmds if isinstance(c, str) and c.lower().startswith("template ")] try: if _gateway_enabled(config): try: out = _gateway_send( player=player, mode="sudo", text=prompt, context_payload={ "request": prompt, "player": player, "online_players": online, "sudo_history": get_sudo_history_block(), "sudo_failures": get_sudo_failures_block(player), "mode": "sudo", }, config=config, allow_tools=bool(config.get("gateway_allow_tools_sudo", True)), max_tool_steps=int(config.get("gateway_max_tool_steps", 4)), ) log.info(f"Gateway sudo tool_trace={out.get('tool_trace', [])}") commands = out.get("commands") or [] except Exception as e: log.warning(f"Gateway sudo failed, falling back to local translator: {e}") commands = _local_sudo_translate() else: commands = _local_sudo_translate() except Exception as e: log.error(f"SUDO translation failed: {e}") rcon( f'tellraw {player} {{"text":"[SUDO] Translation failed.","color":"red"}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) return def _is_build_intent(text: str) -> bool: t = (text or "").strip().lower() if t.startswith("build ") or t.startswith("create "): return True if "schem" in t or "schematic" in t or "template" in t: return True if t.startswith("make "): build_nouns = ( "house", "base", "tower", "wall", "castle", "bridge", "barn", "bar", "shop", "village", "room", "road", "farm", "portal", "structure", "schem", "schematic", "template", "statue", "arena", ) return any(n in t for n in build_nouns) return False build_intent = _is_build_intent(prompt) has_template_cmd = any(isinstance(c, str) and c.lower().startswith("template ") for c in commands) if build_intent and not has_template_cmd: try: planned = _ai_template_plan() if planned: log.info(f"SUDO template planner override: {planned}") commands = planned except Exception as e: log.warning(f"SUDO template planner failed: {e}") max_cmds = int(config.get("sudo_max_commands", 3)) low_prompt = prompt.lower().strip() if any(low_prompt.startswith(x) for x in ("build ", "make ", "create ")): max_cmds = max(max_cmds, int(config.get("sudo_build_max_commands", 6))) if "tnt" in low_prompt: nums = re.findall(r'\b(\d{1,3})\b', low_prompt) if nums: requested = max(int(n) for n in nums) cap = int(config.get("sudo_tnt_max_commands", 80)) max_cmds = max(max_cmds, min(requested, cap)) commands = [ _normalize_sudo_command_shape(c, player) for c in commands[:max_cmds] ] commands = [c for c in commands if c] commands = _expand_tnt_commands_from_prompt(commands, prompt, player, config) if not commands: add_sudo_history(player, prompt, [], []) rcon( f'tellraw {player} {{"text":"[SUDO] No safe command generated.","color":"yellow"}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) return # Show translated command(s) privately safe_preview = " | ".join(commands).replace("\\", "\\\\").replace('"', '\\"') rcon( f'tellraw {player} {{"text":"[SUDO] {safe_preview}","color":"dark_gray","italic":true}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) executed = [] results_seen = [] for cmd in commands: if cmd.lower().startswith("template "): log.info(f"SUDO template action: {cmd}") try: process_sudo_template_command(player, cmd, config) executed.append(cmd) results_seen.append((cmd, "template action executed")) except Exception as e: log.warning(f"SUDO template action failed: {e}") results_seen.append((cmd, f"template action failed: {e}")) time.sleep(0.15) continue resolved, is_safe = validate_command(cmd, online, player, config) if not is_safe: continue log.info(f"SUDO execute: {resolved}") _sudo_trace(player, f"[SUDO TRY] {resolved}", config) result = rcon(resolved, config["rcon_host"], config["rcon_port"], config["rcon_password"]) log.info(f"SUDO result: {result!r}") _sudo_trace(player, f"[SUDO RES] {str(result or '')[:180]}", config) executed.append(resolved) results_seen.append((resolved, str(result or ""))) time.sleep(0.2) effective_hits = sum(1 for _, res in results_seen if _sudo_result_is_effective(res)) ineffective = (len(executed) == 0) or (effective_hits == 0) # Generic failed-execution repair pass. if ineffective: retry_cmds = _repair_failed_sudo_commands(player, results_seen, config) if retry_cmds: log.warning(f"SUDO retry pipeline engaged: {retry_cmds}") _send_private(player, "[SUDO] Initial command failed; retrying with repaired syntax.", config, "yellow") for cmd in retry_cmds: resolved, is_safe = validate_command(cmd, online, player, config) if not is_safe: continue log.info(f"SUDO retry execute: {resolved}") _sudo_trace(player, f"[SUDO RETRY] {resolved}", config, color="yellow") result = rcon(resolved, config["rcon_host"], config["rcon_port"], config["rcon_password"]) log.info(f"SUDO retry result: {result!r}") _sudo_trace(player, f"[SUDO RETRY RES] {str(result or '')[:180]}", config, color="yellow") executed.append(resolved) results_seen.append((resolved, str(result or ""))) time.sleep(0.12) effective_hits = sum(1 for _, res in results_seen if _sudo_result_is_effective(res)) ineffective = (len(executed) == 0) or (effective_hits == 0) # Adaptive fallback for destructive intent when output appears ineffective. if _is_destructive_intent(prompt) and ineffective and "tnt" not in prompt.lower(): fallback_cmds = _build_destructive_fallback(player, config) log.warning(f"SUDO destructive fallback engaged for prompt={prompt!r}: {fallback_cmds}") _send_private(player, "[SUDO] Initial plan was weak; applying destructive fallback.", config, "yellow") for cmd in fallback_cmds[:max_cmds]: resolved, is_safe = validate_command(cmd, online, player, config) if not is_safe: continue log.info(f"SUDO fallback execute: {resolved}") _sudo_trace(player, f"[SUDO FALLBACK] {resolved}", config, color="red") result = rcon(resolved, config["rcon_host"], config["rcon_port"], config["rcon_password"]) log.info(f"SUDO fallback result: {result!r}") _sudo_trace(player, f"[SUDO FALLBACK RES] {str(result or '')[:180]}", config, color="red") executed.append(resolved) results_seen.append((resolved, str(result or ""))) time.sleep(0.15) effective_hits = sum(1 for _, res in results_seen if _sudo_result_is_effective(res)) ineffective = (len(executed) == 0) or (effective_hits == 0) if _is_fire_intent(prompt) and ineffective: fire_retry = [f"execute at {player} run fill ~-25 ~-1 ~-25 ~25 ~3 ~25 minecraft:fire replace air"] log.warning(f"SUDO fire fallback engaged for prompt={prompt!r}: {fire_retry}") for cmd in fire_retry: resolved, is_safe = validate_command(cmd, online, player, config) if not is_safe: continue log.info(f"SUDO fire fallback execute: {resolved}") _sudo_trace(player, f"[SUDO FIRE FALLBACK] {resolved}", config, color="gold") result = rcon(resolved, config["rcon_host"], config["rcon_port"], config["rcon_password"]) log.info(f"SUDO fire fallback result: {result!r}") _sudo_trace(player, f"[SUDO FIRE RES] {str(result or '')[:180]}", config, color="gold") executed.append(resolved) results_seen.append((resolved, str(result or ""))) time.sleep(0.15) effective_hits = sum(1 for _, res in results_seen if _sudo_result_is_effective(res)) ineffective = (len(executed) == 0) or (effective_hits == 0) for cmd, res in results_seen: if not _sudo_result_is_effective(res): add_sudo_failure(player, cmd, res) set_last_sudo_feedback(player, prompt, results_seen, ineffective) _report_sudo_feedback(player, prompt, commands, results_seen, ineffective, config) add_sudo_history(player, prompt, commands, executed) # Training audit: log the full sudo interaction write_training_audit( player=player, mode="sudo", user_message=f"sudo {prompt}", commands_generated=commands, commands_executed=executed, message="", context={"online_players": online}, config=config, rcon_results=[(cmd, res) for cmd, res in results_seen], ) def get_player_xyz(player: str, config): """Return integer xyz for a player using RCON entity data.""" raw = rcon( f"data get entity {player} Pos", config["rcon_host"], config["rcon_port"], config["rcon_password"], ) vals = re.findall(r'(-?[\d.]+)d', raw) if len(vals) >= 3: return int(float(vals[0])), int(float(vals[1])), int(float(vals[2])) return None def build_template_commands(player: str, prompt: str, config): """ Deterministic paper/builder templates for sudo build requests. Returns command list or None if prompt is not a build template request. """ p = prompt.lower().strip() if not (p.startswith("build ") or p.startswith("make ") or p.startswith("create ")): return None pos = get_player_xyz(player, config) if not pos: return [ f"give {player} minecraft:oak_log 256", f"give {player} minecraft:oak_planks 256", f"give {player} minecraft:stone_bricks 256", f"give {player} minecraft:torch 64", f"give {player} minecraft:oak_door 4", f"give {player} minecraft:glass 64", ] x, y, z = pos # Place slightly in front of player position to avoid trapping player. bx, by, bz = x + 3, y, z + 3 # Simple templates built with fill/setblock so they are plugin-agnostic and fast on Paper. if "house" in p or "hut" in p or "cabin" in p: return [ f"fill {bx} {by} {bz} {bx+6} {by} {bz+6} minecraft:oak_planks", f"fill {bx} {by+1} {bz} {bx+6} {by+4} {bz+6} minecraft:air", f"fill {bx} {by+1} {bz} {bx+6} {by+3} {bz} minecraft:oak_planks", f"fill {bx} {by+1} {bz+6} {bx+6} {by+3} {bz+6} minecraft:oak_planks", f"fill {bx} {by+1} {bz} {bx} {by+3} {bz+6} minecraft:oak_planks", f"fill {bx+6} {by+1} {bz} {bx+6} {by+3} {bz+6} minecraft:oak_planks", f"setblock {bx+3} {by+1} {bz} minecraft:oak_door", f"setblock {bx+3} {by+2} {bz} minecraft:oak_door[half=upper]", f"fill {bx} {by+4} {bz} {bx+6} {by+4} {bz+6} minecraft:spruce_planks", f"setblock {bx+1} {by+1} {bz+1} minecraft:crafting_table", f"setblock {bx+2} {by+1} {bz+1} minecraft:furnace", f"setblock {bx+5} {by+1} {bz+5} minecraft:red_bed", f"give {player} minecraft:torch 16", ] if "tower" in p: return [ f"fill {bx} {by} {bz} {bx+4} {by} {bz+4} minecraft:stone_bricks", f"fill {bx} {by+1} {bz} {bx+4} {by+15} {bz+4} minecraft:air", f"fill {bx} {by+1} {bz} {bx} {by+14} {bz+4} minecraft:stone_bricks", f"fill {bx+4} {by+1} {bz} {bx+4} {by+14} {bz+4} minecraft:stone_bricks", f"fill {bx} {by+1} {bz} {bx+4} {by+14} {bz} minecraft:stone_bricks", f"fill {bx} {by+1} {bz+4} {bx+4} {by+14} {bz+4} minecraft:stone_bricks", f"setblock {bx+2} {by+1} {bz} minecraft:iron_door", f"setblock {bx+2} {by+2} {bz} minecraft:iron_door[half=upper]", f"fill {bx} {by+15} {bz} {bx+4} {by+15} {bz+4} minecraft:stone_bricks", f"give {player} minecraft:ladder 64", ] if "wall" in p: return [ f"fill {bx} {by} {bz} {bx+25} {by+4} {bz} minecraft:cobblestone", f"fill {bx+2} {by+1} {bz} {bx+3} {by+2} {bz} minecraft:air", f"setblock {bx+2} {by+1} {bz} minecraft:oak_fence_gate", f"setblock {bx+2} {by+2} {bz} minecraft:oak_fence_gate", ] if "church" in p or "shrine" in p or "temple" in p: return [ f"fill {bx} {by} {bz} {bx+10} {by} {bz+14} minecraft:stone_bricks", f"fill {bx} {by+1} {bz} {bx+10} {by+8} {bz+14} minecraft:air", f"fill {bx} {by+1} {bz} {bx+10} {by+6} {bz} minecraft:stone_bricks", f"fill {bx} {by+1} {bz+14} {bx+10} {by+6} {bz+14} minecraft:stone_bricks", f"fill {bx} {by+1} {bz} {bx} {by+6} {bz+14} minecraft:stone_bricks", f"fill {bx+10} {by+1} {bz} {bx+10} {by+6} {bz+14} minecraft:stone_bricks", f"setblock {bx+5} {by+1} {bz} minecraft:oak_door", f"setblock {bx+5} {by+2} {bz} minecraft:oak_door[half=upper]", f"fill {bx+4} {by+1} {bz+2} {bx+6} {by+1} {bz+4} minecraft:quartz_block", f"setblock {bx+5} {by+2} {bz+3} minecraft:lantern", f"fill {bx+2} {by+7} {bz+2} {bx+8} {by+9} {bz+12} minecraft:spruce_planks", f"summon minecraft:firework_rocket {bx+5} {by+2} {bz+7}", ] # Generic fallback build package return [ f"give {player} minecraft:oak_log 256", f"give {player} minecraft:oak_planks 256", f"give {player} minecraft:stone_bricks 256", f"give {player} minecraft:torch 64", f"give {player} minecraft:oak_door 4", f"give {player} minecraft:glass 64", ] # --------------------------------------------------------------------------- # Prayer handler # --------------------------------------------------------------------------- BIBLE_LINES = [ ("", "gold", True), ("[=== THE HOLY SCRIPTURE ===]", "gold", True), ("", "gold", True), ("God watches over this server.", "yellow", False), ("Speak to him by typing in chat:", "white", False), (" pray ", "green", True), (" bug_log ", "aqua", True), ("", "white", False), ("God is benevolent, but just.", "yellow", False), ("He hears every prayer — but answers as he sees fit.", "white", False), ("He may reward you, punish you, or act upon another player entirely.", "white", False), ("", "white", False), ("Examples:", "yellow", False), (" pray Lord, bless my journey through the mines.", "gray", False), (" pray Smite my enemy, for they have wronged me.", "gray", False), (" pray Forgive me, I have sinned against thy creations.", "gray", False), (" bug_log creeper explosion desynced and killed me", "dark_aqua", False), ("", "white", False), ("Thou may only pray once every 20 seconds.", "red", False), ("Type \"bible\" in chat to see this again.", "gray", False), ("God intervenes unprompted. Watch the skies.", "dark_purple", True), ("", "gold", True), ("[========================]", "gold", True), ("", "gold", True), ] def send_bible(player, config): log.info(f"/bible requested by {player}") h = config["rcon_host"] p = config["rcon_port"] pw = config["rcon_password"] for text, color, bold in BIBLE_LINES: bold_str = "true" if bold else "false" safe = text.replace('"', '\\"') rcon(f'tellraw {player} {{"text":"{safe}","color":"{color}","bold":{bold_str}}}', h, p, pw) ACK_MESSAGES = [ "Your prayer has been received. The heavens stir...", "The divine ear turns toward thee. Await judgement...", "A silence falls across the heavens. God is listening...", "Thy words rise like incense. An answer approaches...", "The cosmos trembles with thy supplication. Patience...", ] def process_prayer(player, prayer, config, cooldowns): online = players_online(config) if not online: log.info("Prayer received but no players online — dropping") return now = time.time() last = cooldowns.get(player, 0) cooldown_secs = config.get("cooldown_seconds", 60) if now - last < cooldown_secs: remaining = int(cooldown_secs - (now - last)) rcon( f'tellraw {player} {{"text":"[GOD] Thou must wait {remaining} more seconds before praying again.","color":"gold"}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) return cooldowns[player] = now # Immediate acknowledgment ack = random.choice(ACK_MESSAGES) rcon( f'tellraw {player} {{"text":"[GOD] {ack}","color":"gray","italic":true}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) try: context = get_server_context(config) log.info(f"Server context: {context}") except Exception as e: log.warning(f"Could not fetch server context: {e}") context = {"online_players": online, "time_of_day": "unknown", "weather": "unknown", "world_border": None} try: response = ask_god(player, prayer[:300], context, config) except json.JSONDecodeError as e: log.error(f"LLM returned invalid JSON: {e}") rcon( f'tellraw @a {{"text":"[GOD] ...","color":"dark_gray"}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) return except Exception as e: log.error(f"LLM error: {e}") return execute_response(response, context, config, praying_player=player) # Store in prayer memory so God remembers this exchange god_msg = response.get("message") or "" if god_msg: add_prayer_memory(player, prayer, god_msg, config) # Training audit: log the full interaction as a candidate training pair write_training_audit( player=player, mode="god", user_message=f"pray {prayer}", commands_generated=response.get("commands") or [], commands_executed=response.get("commands") or [], # prayer path doesn't track executed separately message=god_msg, context=context, config=config, ) # --------------------------------------------------------------------------- # Divine intervention timer # --------------------------------------------------------------------------- def next_intervention_delay(avg_per_day): avg_seconds = 86400.0 / avg_per_day return random.expovariate(1.0 / avg_seconds) def divine_intervention_loop(config): avg_per_day = config.get("interventions_per_day", 4) if avg_per_day <= 0: log.info("Divine intervention disabled (interventions_per_day=0)") return log.info(f"Divine intervention loop started — avg {avg_per_day}/day") while True: delay = next_intervention_delay(avg_per_day) log.info(f"Next divine intervention in {delay/3600:.2f}h ({int(delay)}s)") time.sleep(delay) online = players_online(config) if not online: log.info("Intervention timer fired — no players online, skipping") continue try: context = get_server_context(config) context["online_players"] = online except Exception as e: log.warning(f"Intervention: could not fetch server context: {e}") context = {"online_players": online, "time_of_day": "unknown", "weather": "unknown", "world_border": None} try: response = ask_god_intervention(context, config) except Exception as e: log.error(f"Intervention LLM error: {e}") continue if not (response.get("message") or response.get("commands")): log.info("God chose silence this interval.") continue log.info("God intervenes unprompted.") execute_response(response, context, config, praying_player=None) # --------------------------------------------------------------------------- # Log tail # --------------------------------------------------------------------------- def tail_log(log_path): """ Follow latest.log robustly across truncation/rotation. Paper can recreate latest.log on restart; we detect inode changes and reopen. """ f = open(log_path, 'r') f.seek(0, 2) current_inode = os.fstat(f.fileno()).st_ino while True: line = f.readline() if line: yield line continue # No new line yet; check for rotation/truncation try: st = os.stat(log_path) if st.st_ino != current_inode or st.st_size < f.tell(): f.close() f = open(log_path, 'r') current_inode = os.fstat(f.fileno()).st_ino f.seek(0, 2) log.info("Reattached log tail after latest.log rotation/truncation") except FileNotFoundError: # During restart, file may not exist briefly pass time.sleep(0.2) # --------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------- def main(): with open(CONFIG_PATH) as f: config = json.load(f) log.info(f"mc_aigod starting — server: {config['server_name']}") log.info(f"Log: {config['log_path']}") log.info(f"LLM: {config['ollama_url']} model={config['model']}") log.info(f"RCON: {config['rcon_host']}:{config['rcon_port']}") load_prayer_memory(config) load_first_login_seen(config) cooldowns = {} t = threading.Thread(target=divine_intervention_loop, args=(config,), daemon=True) t.start() for line in tail_log(config["log_path"]): # Feed every line into the rolling log buffer add_log_event(line) # bug logging trigger matched = False for pat in BUG_LOG_PATTERNS: m = pat.search(line) if m: player = m.group(1) description = (m.group(2) or "").strip() log.info(f"BUG_LOG from {player}: {description or '(no description)'}") try: process_bug_log(player, description, config) except Exception as e: log.error(f"Error processing bug_log: {e}", exc_info=True) matched = True break if matched: continue # sudo translator matched = False for pat in SUDO_PATTERNS: m = pat.search(line) if m: player = m.group(1) prompt = m.group(2).strip() log.info(f"SUDO from {player}: {prompt}") try: process_sudo(player, prompt, config) except Exception as e: log.error(f"Error processing sudo: {e}", exc_info=True) matched = True break if matched: continue # /pray matched = False for pat in PRAY_PATTERNS: m = pat.search(line) if m: player = m.group(1) prayer = m.group(2).strip() log.info(f"Prayer from {player}: {prayer}") try: process_prayer(player, prayer, config, cooldowns) except Exception as e: log.error(f"Error processing prayer: {e}", exc_info=True) matched = True break if matched: continue # /bible for pat in BIBLE_PATTERNS: m = pat.search(line) if m: player = m.group(1) try: send_bible(player, config) except Exception as e: log.error(f"Error sending bible to {player}: {e}", exc_info=True) break # login notice m = JOIN_PATTERN.search(line) if m: player = m.group(1) log.info(f"Login notice → {player}") try: rcon( f'tellraw {player} {{"text":"[GOD] GOD ENABLED — Type \\"bible\\" in chat for guidance. Type \\"pray \\" to pray.","color":"gold","bold":true}}', config["rcon_host"], config["rcon_port"], config["rcon_password"] ) except Exception as e: log.error(f"Error sending login notice to {player}: {e}", exc_info=True) # First-login benevolence (once per player) try: process_first_login_benevolence(player, config) except Exception as e: log.error(f"Error running first-login benevolence for {player}: {e}", exc_info=True) if __name__ == '__main__': main()