Files
minecraft-ai-god-paper-fork/mc_aigod_paper.py
T
2026-03-16 19:41:53 -04:00

2854 lines
110 KiB
Python

#!/usr/bin/env python3
"""
mc_aigod.py — Minecraft AI God watcher
Intercepts /pray commands, fetches live server state, queries Ollama,
validates targets, and executes commands via RCON.
Config: /etc/mc_aigod.json
"""
import json, os, random, re, socket, struct, threading, time, logging
from collections import deque
from datetime import datetime
import shutil
from urllib.parse import parse_qs, unquote, urljoin, urlparse
import requests
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [aigod] %(levelname)s: %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('/var/log/mc_aigod_paper.log'),
]
)
log = logging.getLogger(__name__)
CONFIG_PATH = '/etc/mc_aigod_paper.json'
PRAY_PATTERNS = [
re.compile(r'\[.*?\]: <(\w+)> [Pp]ray (.+)'),
]
BIBLE_PATTERNS = [
re.compile(r'\[.*?\]: <(\w+)> [Bb]ible\s*$'),
]
SUDO_PATTERNS = [
re.compile(r'\[.*?\]: <(\w+)> [Ss]udo (.+)'),
]
JOIN_PATTERN = re.compile(r'\[.*?\]: (\w+) joined the game')
LEAVE_PATTERN = re.compile(r'\[.*?\]: (\w+) left the game')
# Interesting server events to capture for God's awareness
# Matches: chat, deaths, joins, leaves — skips RCON/thread noise
LOG_INTERESTING = re.compile(
r'\[.*?(?:Server thread|Async Chat).*?INFO\]: '
r'(?:<\w+>.*|' # chat
r'\w+ joined the game|' # join
r'\w+ left the game|' # leave
r'\w+ (?:died|was slain|was shot|drowned|fell|burned|blew up|suffocated|starved|was killed|hit the ground|went up in flames|tried to swim in lava).*)'
)
# ---------------------------------------------------------------------------
# Shared memory buffers (module-level, shared across threads)
# ---------------------------------------------------------------------------
# Rolling log buffer — keeps last LOG_MAX_LINES interesting events,
# pruning anything older than LOG_MAX_HOURS. Whichever limit hits first.
LOG_MAX_LINES = 200
LOG_MAX_HOURS = 3
recent_log: deque = deque() # entries: (timestamp_float, str)
# God's prayer memory — last N prayer/response pairs across all players
# Stored as (player, prayer_text, god_message) tuples
PRAYER_MEMORY_SIZE = 10
prayer_memory: deque = deque() # entries: (player, prayer, god_message)
# First-login benevolence memory — players already blessed on first join
first_login_seen = set()
# Sudo translator memory — last N sudo translations/executions
SUDO_HISTORY_SIZE = 10
sudo_history: deque = deque() # entries: (ts, player, prompt, translated_cmds, executed_cmds)
_memory_lock = threading.Lock()
# Gateway client session mapping (player+mode -> session_id)
_gateway_sessions = {}
_gateway_lock = threading.Lock()
DEFAULT_TEMPLATE_DIR = "/opt/paper-ai-25567/worldkit_templates"
DEFAULT_TEMPLATE_HOSTS = [
"raw.githubusercontent.com",
"github.com",
"planetminecraft.com",
"www.planetminecraft.com",
"modrinth.com",
"cdn.modrinth.com",
]
DEFAULT_TEMPLATE_SEARCH_BLOCKED_HOSTS = [
"planetminecraft.com",
"www.planetminecraft.com",
]
DEFAULT_FAWE_SCHEM_DIR = "/opt/paper-ai-25567/plugins/FastAsyncWorldEdit/schematics"
TEMPLATE_SEARCH_CACHE_SIZE = 20
_template_search_cache = {}
_template_search_lock = threading.Lock()
_template_last_downloaded = {}
def add_log_event(line: str):
"""Add a meaningful log line to the rolling buffer."""
if not LOG_INTERESTING.search(line):
return
m = re.search(r'\[.*?INFO\]: (.+)', line)
if not m:
return
entry = m.group(1).strip()
now = time.time()
with _memory_lock:
recent_log.append((now, entry))
# Prune by age
cutoff = now - (LOG_MAX_HOURS * 3600)
while recent_log and recent_log[0][0] < cutoff:
recent_log.popleft()
# Prune by line count
while len(recent_log) > LOG_MAX_LINES:
recent_log.popleft()
def _send_private(player: str, text: str, config, color: str = "gray", italic: bool = False):
safe = text.replace("\\", "\\\\").replace('"', '\\"').replace("\n", " ").replace("\r", " ")
italic_raw = "true" if italic else "false"
rcon(
f'tellraw {player} {{"text":"{safe}","color":"{color}","italic":{italic_raw}}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"],
)
def _template_path(config) -> str:
return config.get("template_dir", DEFAULT_TEMPLATE_DIR)
def _fawe_schem_path(config) -> str:
return config.get("fawe_schem_dir", DEFAULT_FAWE_SCHEM_DIR)
def _template_hosts(config) -> list:
hosts = config.get("template_allowed_hosts", DEFAULT_TEMPLATE_HOSTS)
return [str(h).lower().strip() for h in hosts if str(h).strip()]
def _template_search_blocked_hosts(config) -> list:
hosts = config.get("template_search_blocked_hosts", DEFAULT_TEMPLATE_SEARCH_BLOCKED_HOSTS)
return [str(h).lower().strip() for h in hosts if str(h).strip()]
def _is_search_blocked_host(url: str, config) -> bool:
try:
host = urlparse(url).netloc.lower()
except Exception:
return False
for h in _template_search_blocked_hosts(config):
if host == h or host.endswith("." + h):
return True
return False
def _safe_template_name(name: str) -> str:
n = re.sub(r'[^a-zA-Z0-9._-]+', '_', name.strip())
return n[:80] if n else f"template_{int(time.time())}"
def _is_allowed_template_url(url: str, config) -> bool:
try:
p = urlparse(url)
if p.scheme not in ("https",):
return False
host = (p.netloc or "").lower()
allowed = _template_hosts(config)
return any(host == h or host.endswith("." + h) for h in allowed)
except Exception:
return False
def _extract_ddg_links_from_html(html: str) -> list:
links = []
for href in re.findall(r'href="([^"]+)"', html):
if "/l/?" not in href:
continue
parsed = urlparse(href)
qs = parse_qs(parsed.query)
target = (qs.get("uddg") or [""])[0]
if not target:
continue
url = unquote(target)
if url.startswith("http://") or url.startswith("https://"):
links.append(url)
return links
def _search_templates(query: str, config) -> list:
# No page scraping: return direct file URLs only.
limit = int(config.get("template_search_limit", 8))
out = []
seen = set()
qlow = query.lower().strip()
def add_url(url: str, text: str = ""):
if url in seen:
return
if not _is_allowed_template_url(url, config):
return
path = urlparse(url).path.lower()
if not re.search(r'\.(schem|schematic|nbt|zip)$', path):
return
seen.add(url)
out.append({"url": url, "text": text[:160]})
# First: include configured sync catalog entries that match query.
try:
for e in _template_sync_entries(config):
u = e.get("url", "")
nm = e.get("name", "")
blob = f"{u} {nm}".lower()
if qlow and qlow not in blob:
continue
add_url(u, "sync source")
if len(out) >= limit:
return out
except Exception as e:
log.debug(f"Template sync catalog presearch failed: {e}")
for ext in ("schem", "schematic", "nbt", "zip"):
if len(out) >= limit:
break
try:
r = requests.get(
"https://api.github.com/search/code",
params={"q": f"{query} extension:{ext} in:path", "per_page": 10},
timeout=20,
headers={"Accept": "application/vnd.github+json"},
)
if r.status_code != 200:
continue
data = r.json()
for item in data.get("items") or []:
html_url = item.get("html_url") or ""
if not html_url:
continue
p = urlparse(html_url)
if p.netloc.lower() == "github.com" and "/blob/" in p.path:
raw_path = p.path.replace("/blob/", "/", 1)
raw = f"https://raw.githubusercontent.com{raw_path}"
add_url(raw, "github code search")
if len(out) >= limit:
break
except Exception as e:
log.debug(f"Template GitHub code search failed: {e}")
return out
def _format_bytes(n: int) -> str:
if n < 1024:
return f"{n}B"
if n < 1024 * 1024:
return f"{n/1024:.1f}KB"
return f"{n/(1024*1024):.1f}MB"
def _cache_template_search(player: str, rows: list):
urls = [str(r.get("url", "")).strip() for r in rows if isinstance(r, dict)]
urls = [u for u in urls if u]
with _template_search_lock:
_template_search_cache[player] = {
"ts": time.time(),
"urls": urls[:TEMPLATE_SEARCH_CACHE_SIZE],
}
def _get_cached_template_url(player: str, token: str) -> str:
tok = token.strip().strip("[](){}\\")
if tok.startswith("#"):
tok = tok[1:]
if not tok.isdigit():
return ""
idx = int(tok)
if idx <= 0:
return ""
with _template_search_lock:
entry = _template_search_cache.get(player) or {}
urls = entry.get("urls") or []
if idx > len(urls):
return ""
return urls[idx - 1]
def _get_cached_template_urls(player: str) -> list:
with _template_search_lock:
entry = _template_search_cache.get(player) or {}
urls = entry.get("urls") or []
return [str(u) for u in urls if str(u).strip()]
def _list_templates(config, max_rows: int = 12) -> list:
root = _template_path(config)
os.makedirs(root, exist_ok=True)
rows = []
for name in sorted(os.listdir(root)):
full = os.path.join(root, name)
if not os.path.isfile(full):
continue
try:
size = os.path.getsize(full)
except OSError:
size = 0
rows.append({"name": name, "size": size})
rows.sort(key=lambda x: x["name"].lower())
return rows[:max_rows]
def _delete_template(name: str, config) -> str:
safe = os.path.basename(name.strip())
if not safe:
raise ValueError("Missing template filename")
if safe != name.strip():
raise ValueError("Invalid template filename")
full = os.path.join(_template_path(config), safe)
if not os.path.isfile(full):
raise ValueError("Template file not found")
os.remove(full)
return full
def _download_template(url: str, name_hint: str, config) -> str:
return _download_template_inner(url, name_hint, config, depth=0)
def _template_sync_entries(config) -> list:
entries = []
for url in config.get("template_sync_urls", []) or []:
u = str(url).strip()
if u:
entries.append({"url": u, "name": ""})
manifest_url = str(config.get("template_sync_manifest_url", "")).strip()
if manifest_url:
r = requests.get(manifest_url, timeout=25)
r.raise_for_status()
data = r.json()
if isinstance(data, list):
for item in data:
if isinstance(item, str):
entries.append({"url": item.strip(), "name": ""})
elif isinstance(item, dict):
entries.append({"url": str(item.get("url", "")).strip(), "name": str(item.get("name", "")).strip()})
elif isinstance(data, dict):
for item in data.get("templates", []) or []:
if isinstance(item, str):
entries.append({"url": item.strip(), "name": ""})
elif isinstance(item, dict):
entries.append({"url": str(item.get("url", "")).strip(), "name": str(item.get("name", "")).strip()})
deduped = []
seen = set()
for e in entries:
u = e.get("url", "").strip()
if not u or u in seen:
continue
if not _is_allowed_template_url(u, config):
continue
seen.add(u)
deduped.append({"url": u, "name": e.get("name", "").strip()})
return deduped
def _template_sync(config) -> dict:
entries = _template_sync_entries(config)
if not entries:
return {"ok": 0, "failed": 0, "errors": ["No template sync sources configured"]}
max_files = int(config.get("template_sync_max_files", 50))
ok = 0
failed = 0
errors = []
for e in entries[:max_files]:
try:
_download_template(e["url"], e.get("name", ""), config)
ok += 1
except Exception as ex:
failed += 1
errors.append(f"{e['url']}: {ex}")
return {"ok": ok, "failed": failed, "errors": errors[:6]}
def _extract_template_candidates_from_html(base_url: str, html: str, config) -> list:
candidates = []
seen = set()
for href in re.findall(r'href=["\']([^"\']+)["\']', html, re.IGNORECASE):
full = urljoin(base_url, href.strip())
if full in seen:
continue
seen.add(full)
p = urlparse(full)
host = p.netloc.lower()
path = p.path.lower()
if host == "github.com" and "/blob/" in p.path and re.search(r'\.(schem|schematic|nbt|zip)$', path):
raw_path = p.path.replace("/blob/", "/", 1)
raw = f"https://raw.githubusercontent.com{raw_path}"
if _is_allowed_template_url(raw, config):
candidates.append(raw)
continue
if re.search(r'\.(schem|schematic|nbt|zip)$', path):
if _is_allowed_template_url(full, config):
candidates.append(full)
return candidates
def _extract_template_page_candidates_from_html(base_url: str, html: str, config) -> list:
pages = []
seen = set()
base_host = urlparse(base_url).netloc.lower()
for href in re.findall(r'href=["\']([^"\']+)["\']', html, re.IGNORECASE):
full = urljoin(base_url, href.strip())
if full in seen:
continue
seen.add(full)
p = urlparse(full)
host = p.netloc.lower()
path = p.path.lower()
if _is_search_blocked_host(full, config):
continue
if not _is_allowed_template_url(full, config):
continue
if re.search(r'\.(schem|schematic|nbt|zip)$', path):
continue
if re.search(r'\.(png|jpg|jpeg|gif|webp|svg|ico|css|js|txt|json|xml|pdf)$', path):
continue
if '/search' in path or '/projects/' == path or path.endswith('/projects/'):
continue
# Prefer detail pages on same domain as seed page.
if host == base_host or 'planetminecraft.com' in host or 'modrinth.com' in host or host == 'github.com':
pages.append(full)
return pages[:6]
def _looks_like_template_binary(first_bytes: bytes, content_type: str) -> bool:
if not first_bytes:
return False
# Known binary containers used by schematics/templates.
if first_bytes.startswith(b"\x1f\x8b"): # gzip
return True
if first_bytes.startswith(b"PK\x03\x04"): # zip
return True
if first_bytes[:1] == b"\x0a": # NBT tag root (common for uncompressed NBT)
return True
# Common non-template binary signatures to reject.
if first_bytes.startswith(b"\x89PNG\r\n\x1a\n"):
return False
if first_bytes.startswith(b"\xff\xd8\xff"): # jpeg
return False
if first_bytes.startswith(b"GIF87a") or first_bytes.startswith(b"GIF89a"):
return False
if first_bytes.startswith(b"%PDF"):
return False
ct = (content_type or "").lower()
if ct.startswith("text/") or "html" in ct or "css" in ct or "javascript" in ct or "json" in ct:
return False
# Heuristic: templates should not look like plain text/CSS/HTML.
sample = first_bytes[:256]
printable = sum(1 for b in sample if 9 <= b <= 13 or 32 <= b <= 126)
ratio = printable / max(1, len(sample))
low = sample.lower()
if ratio > 0.9 and (b"<html" in low or b"<!doctype" in low or b".light" in low or b"body{" in low):
return False
if ratio > 0.97:
return False
# If it is not one of known schematic container signatures, reject.
return False
def _download_template_inner(url: str, name_hint: str, config, depth: int = 0, visited=None) -> str:
if visited is None:
visited = set()
if url in visited:
raise ValueError("Template URL loop detected")
visited.add(url)
if depth > 2:
raise ValueError("Could not resolve a direct template file URL")
p_in = urlparse(url)
if p_in.netloc.lower() == "github.com" and "/blob/" in p_in.path:
# Convert common GitHub blob URL to raw URL automatically.
raw_path = p_in.path.replace("/blob/", "/", 1)
url = f"https://raw.githubusercontent.com{raw_path}"
if not _is_allowed_template_url(url, config):
raise ValueError("Template URL host not allowed")
p = urlparse(url)
base = os.path.basename(p.path) or "template.schem"
if "." in base:
ext = "." + base.split(".")[-1].lower()
else:
ext = ".schem"
if ext not in (".schem", ".schematic", ".nbt", ".zip"):
ext = ".schem"
fname = _safe_template_name(name_hint or os.path.splitext(base)[0]) + ext
out_dir = _template_path(config)
os.makedirs(out_dir, exist_ok=True)
out_path = os.path.join(out_dir, fname)
try:
with requests.get(url, stream=True, timeout=45) as r:
if r.status_code == 403:
raise ValueError(f"Source blocked automated download (403): {urlparse(url).netloc}")
r.raise_for_status()
content_type = (r.headers.get("Content-Type") or "").lower()
if "text/html" in content_type:
raise ValueError("URL returned HTML page. template download requires a direct file URL")
total = 0
max_bytes = int(config.get("template_max_bytes", 10 * 1024 * 1024))
first_bytes = b""
with open(out_path, "wb") as f:
for chunk in r.iter_content(chunk_size=65536):
if not chunk:
continue
if not first_bytes:
first_bytes = chunk[:512]
if not _looks_like_template_binary(first_bytes, content_type):
raise ValueError("Downloaded content is not a valid binary template file")
total += len(chunk)
if total > max_bytes:
raise ValueError("Template file too large")
f.write(chunk)
if os.path.isfile(out_path) and os.path.getsize(out_path) == 0:
raise ValueError("Downloaded empty template file")
except Exception:
try:
if os.path.isfile(out_path):
os.remove(out_path)
except Exception:
pass
raise
return out_path
def _cache_last_download(player: str, path: str):
with _template_search_lock:
_template_last_downloaded[player] = path
def _last_download(player: str) -> str:
with _template_search_lock:
return _template_last_downloaded.get(player, "")
def _resolve_template_file(name_or_file: str, player: str, config) -> str:
token = (name_or_file or "").strip().strip("[](){}\\")
if not token:
return _last_download(player)
root = _template_path(config)
direct = os.path.join(root, os.path.basename(token))
if os.path.isfile(direct):
return direct
stem = os.path.basename(token)
for ext in (".schem", ".schematic", ".nbt", ".zip"):
cand = os.path.join(root, stem + ext)
if os.path.isfile(cand):
return cand
return ""
def _stage_template_for_fawe(src_path: str, config) -> str:
schem_dir = _fawe_schem_path(config)
os.makedirs(schem_dir, exist_ok=True)
base = os.path.basename(src_path)
name_no_ext = _safe_template_name(os.path.splitext(base)[0])
dest = os.path.join(schem_dir, name_no_ext + ".schem")
shutil.copyfile(src_path, dest)
return name_no_ext
def _info_lookup_wiki(query: str) -> list:
try:
r = requests.get(
"https://minecraft.wiki/api.php",
params={
"action": "query",
"list": "search",
"srsearch": query,
"srlimit": 3,
"format": "json",
},
timeout=20,
)
r.raise_for_status()
data = r.json()
rows = []
for it in (data.get("query", {}).get("search", []) or [])[:3]:
title = str(it.get("title", "")).strip()
if not title:
continue
rows.append(f"minecraft.wiki: {title}")
return rows
except Exception:
return []
def _info_lookup_web(query: str) -> list:
try:
r = requests.get(
"https://api.duckduckgo.com/",
params={"q": query, "format": "json", "no_redirect": 1, "no_html": 1},
timeout=20,
)
r.raise_for_status()
data = r.json()
rows = []
abs_text = (data.get("AbstractText") or "").strip()
if abs_text:
rows.append(abs_text[:220])
heading = (data.get("Heading") or "").strip()
if heading and not abs_text:
rows.append(f"Topic: {heading}")
for it in data.get("RelatedTopics") or []:
if isinstance(it, dict) and it.get("Text"):
rows.append(str(it.get("Text"))[:220])
if len(rows) >= 3:
break
return rows[:3]
except Exception:
return []
def process_sudo_template_command(player: str, prompt: str, config) -> bool:
"""
Handle deterministic template management via sudo.
Returns True if prompt was a template command and has been handled.
"""
raw = prompt.strip()
p = raw.lower()
if not p.startswith("template "):
return False
if not (
p == "template help"
or
p.startswith("template search ")
or p.startswith("template download ")
or p.startswith("template install ")
or p.startswith("template pick ")
or p == "template sync"
or p.startswith("template build")
or p == "template list"
or p.startswith("template delete ")
or p == "template hosts"
):
_send_private(player, "[SUDO-TEMPLATE] Unknown template command. Use: template help", config, "yellow")
return True
try:
if p == "template help":
_send_private(player, "[SUDO-TEMPLATE] Commands:", config, "aqua")
_send_private(player, "- template search <query>", config, "gray")
_send_private(player, "- template download <https-url|#n|n> [name]", config, "gray")
_send_private(player, "- template install <https-url> [name]", config, "gray")
_send_private(player, "- template pick <n> [name] (download from last search)", config, "gray")
_send_private(player, "- template sync (pull from configured sync sources)", config, "gray")
_send_private(player, "- template build <filename|name> (or no arg = last download)", config, "gray")
_send_private(player, "- template list", config, "gray")
_send_private(player, "- template delete <filename>", config, "gray")
_send_private(player, "- template hosts", config, "gray")
return True
if p == "template hosts":
_send_private(player, "[SUDO-TEMPLATE] Allowed hosts:", config, "aqua")
for h in _template_hosts(config):
_send_private(player, f"- {h}", config, "gray")
return True
if p == "template list":
rows = _list_templates(config)
if not rows:
_send_private(player, "[SUDO-TEMPLATE] Template folder is empty.", config, "yellow")
return True
_send_private(player, "[SUDO-TEMPLATE] Local templates:", config, "aqua")
for row in rows:
_send_private(player, f"- {row['name']} ({_format_bytes(row['size'])})", config, "gray")
return True
if p.startswith("template delete "):
name = raw[len("template delete "):].strip()
if not name:
_send_private(player, "[SUDO-TEMPLATE] Usage: template delete <filename>", config, "yellow")
return True
deleted = _delete_template(name, config)
_send_private(player, f"[SUDO-TEMPLATE] Deleted: {deleted}", config, "green")
return True
if p.startswith("template search "):
query = raw[len("template search "):].strip()
if not query:
_send_private(player, "[SUDO-TEMPLATE] Usage: template search <query>", config, "yellow")
return True
rows = _search_templates(query, config)
if not rows:
_send_private(player, "[SUDO-TEMPLATE] No direct template links found.", config, "yellow")
_send_private(player, "[SUDO-TEMPLATE] Tip: configure sync sources then run: template sync", config, "dark_aqua")
return True
_cache_template_search(player, rows)
_send_private(player, "[SUDO-TEMPLATE] Top results:", config, "aqua")
for i, row in enumerate(rows[:8], 1):
_send_private(player, f"{i}) {row['url']}", config, "gray")
_send_private(player, "[SUDO-TEMPLATE] Tip: use 'sudo template pick <n> [name]'.", config, "dark_aqua")
return True
if p.startswith("template pick "):
rest = raw[len("template pick "):].strip()
if not rest:
_send_private(player, "[SUDO-TEMPLATE] Usage: template pick <n> [name]", config, "yellow")
return True
parts = rest.split()
token = parts[0]
name_start = 1
if token.lower() in ("option", "result") and len(parts) > 1:
token = parts[1]
name_start = 2
url = _get_cached_template_url(player, token)
if not url:
_send_private(player, "[SUDO-TEMPLATE] Invalid index. Run template search first.", config, "yellow")
return True
name_hint = " ".join(parts[name_start:]).strip() if len(parts) > name_start else ""
try:
out_path = _download_template(url, name_hint, config)
except Exception as first_err:
out_path = ""
urls = _get_cached_template_urls(player)
for fallback_url in urls:
if fallback_url == url:
continue
try:
out_path = _download_template(fallback_url, name_hint, config)
break
except Exception:
continue
if not out_path:
raise first_err
_cache_last_download(player, out_path)
_send_private(player, f"[SUDO-TEMPLATE] Downloaded from #{token.lstrip('#')}: {out_path}", config, "green")
return True
if p == "template sync":
_send_private(player, "[SUDO-TEMPLATE] Syncing templates...", config, "aqua")
res = _template_sync(config)
_send_private(player, f"[SUDO-TEMPLATE] Sync complete: ok={res['ok']} failed={res['failed']}", config, "green" if res['failed'] == 0 else "yellow")
for err in res.get("errors", [])[:3]:
_send_private(player, f"[SUDO-TEMPLATE] {err}", config, "gray")
return True
if p.startswith("template build"):
if not get_server_capabilities(config).get("template_build", False):
_send_private(player, "[SUDO-TEMPLATE] template build requires a Paper server with WorldEdit/FAWE.", config, "yellow")
return True
rest = raw[len("template build"):].strip()
src = _resolve_template_file(rest, player, config)
if not src:
_send_private(player, "[SUDO-TEMPLATE] Template not found. Use template list or download first.", config, "yellow")
return True
staged_name = _stage_template_for_fawe(src, config)
_send_private(player, f"[SUDO-TEMPLATE] Staged for FAWE: {os.path.basename(src)}", config, "aqua")
_send_private(player, f"[SUDO-TEMPLATE] Run in chat: //schem load {staged_name}", config, "gray")
_send_private(player, "[SUDO-TEMPLATE] Then run: //paste -a", config, "gray")
return True
cmd = "template download " if p.startswith("template download ") else "template install "
rest = raw[len(cmd):].strip()
if not rest:
_send_private(player, "[SUDO-TEMPLATE] Usage: template download <https-url|#n|n> [name]", config, "yellow")
return True
parts = rest.split()
first = parts[0]
token = first
name_start = 1
if first.lower() in ("option", "result") and len(parts) > 1:
token = parts[1]
name_start = 2
cached_url = _get_cached_template_url(player, token)
url = cached_url or first
if cached_url:
name_hint = " ".join(parts[name_start:]).strip() if len(parts) > name_start else ""
else:
name_hint = " ".join(parts[1:]).strip() if len(parts) > 1 else ""
out_path = _download_template(url, name_hint, config)
_cache_last_download(player, out_path)
_send_private(player, f"[SUDO-TEMPLATE] Downloaded: {out_path}", config, "green")
return True
except Exception as e:
log.warning(f"Template sudo command failed: {e}")
_send_private(player, f"[SUDO-TEMPLATE] Failed: {e}", config, "red")
return True
def _memory_path(config) -> str:
return config.get(
"memory_path",
"/opt/mcsmanager/daemon/data/InstanceData/shrinkborder1234567890abcdef12345/aigod_memory.json"
)
def save_prayer_memory(config):
"""Persist prayer memory to disk."""
try:
path = _memory_path(config)
with _memory_lock:
data = list(prayer_memory)
with open(path, 'w') as f:
json.dump(data, f)
log.debug(f"Prayer memory saved ({len(data)} entries)")
except Exception as e:
log.warning(f"Could not save prayer memory: {e}")
def load_prayer_memory(config):
"""Load prayer memory from disk on startup."""
try:
path = _memory_path(config)
with open(path) as f:
data = json.load(f)
with _memory_lock:
prayer_memory.clear()
for entry in data[-PRAYER_MEMORY_SIZE:]:
prayer_memory.append(tuple(entry))
log.info(f"Prayer memory loaded ({len(data)} entries from {path})")
except FileNotFoundError:
log.info("No prayer memory file found — starting fresh.")
except Exception as e:
log.warning(f"Could not load prayer memory: {e}")
def add_prayer_memory(player: str, prayer: str, god_message: str, config=None):
"""Record a completed prayer exchange and persist to disk."""
with _memory_lock:
prayer_memory.append((player, prayer[:200], god_message[:300]))
while len(prayer_memory) > PRAYER_MEMORY_SIZE:
prayer_memory.popleft()
if config:
save_prayer_memory(config)
def _first_login_path(config) -> str:
return config.get(
"first_login_path",
"/opt/mcsmanager/daemon/data/InstanceData/shrinkborder1234567890abcdef12345/aigod_first_login_seen.json"
)
def load_first_login_seen(config):
"""Load first-login blessing memory from disk."""
try:
path = _first_login_path(config)
with open(path) as f:
data = json.load(f)
with _memory_lock:
first_login_seen.clear()
for name in data:
first_login_seen.add(str(name))
log.info(f"First-login memory loaded ({len(data)} players from {path})")
except FileNotFoundError:
log.info("No first-login memory file found — starting fresh.")
except Exception as e:
log.warning(f"Could not load first-login memory: {e}")
def save_first_login_seen(config):
"""Persist first-login blessing memory to disk."""
try:
path = _first_login_path(config)
with _memory_lock:
data = sorted(first_login_seen)
with open(path, 'w') as f:
json.dump(data, f)
except Exception as e:
log.warning(f"Could not save first-login memory: {e}")
def has_first_login_seen(player: str) -> bool:
with _memory_lock:
return player in first_login_seen
def mark_first_login_seen(player: str, config):
with _memory_lock:
first_login_seen.add(player)
save_first_login_seen(config)
def get_log_context_block() -> str:
"""Return recent server events as a formatted string for the LLM."""
with _memory_lock:
entries = list(recent_log)
if not entries:
return ""
now = time.time()
lines = []
for ts, entry in entries:
mins_ago = int((now - ts) / 60)
if mins_ago < 60:
time_label = f"{mins_ago}m ago"
else:
time_label = f"{mins_ago // 60}h {mins_ago % 60}m ago"
lines.append(f" [{time_label}] {entry}")
return f"\n=== RECENT SERVER EVENTS (last {len(lines)} events, up to {LOG_MAX_HOURS}h) ===\n" + "\n".join(lines) + "\n"
def get_prayer_history_messages() -> list:
"""
Return prayer memory as alternating user/assistant message dicts
for insertion into the Ollama messages array before the current prayer.
"""
with _memory_lock:
history = list(prayer_memory)
messages = []
for player, prayer, god_msg in history:
messages.append({"role": "user", "content": f"{player} prayed: {prayer}"})
messages.append({"role": "assistant", "content": f'{{"message": "{god_msg}", "commands": []}}'})
return messages
def add_sudo_history(player: str, prompt: str, translated_cmds: list, executed_cmds: list):
"""Record a sudo translation/execution so future sudo requests can reference it."""
with _memory_lock:
sudo_history.append((time.time(), player, prompt[:220], translated_cmds[:6], executed_cmds[:6]))
while len(sudo_history) > SUDO_HISTORY_SIZE:
sudo_history.popleft()
def get_sudo_history_block() -> str:
"""Return last N sudo commands/translations as context for translator model."""
with _memory_lock:
entries = list(sudo_history)
if not entries:
return ""
now = time.time()
lines = []
for ts, player, prompt, translated, executed in entries:
mins_ago = int((now - ts) / 60)
tlabel = f"{mins_ago}m ago"
trans = " | ".join(translated) if translated else "(none)"
execd = " | ".join(executed) if executed else "(none)"
lines.append(
f" [{tlabel}] {player} asked: {prompt}"
f"\n translated: {trans}"
f"\n executed: {execd}"
)
return "\n=== LAST 10 SUDO ACTIONS ===\n" + "\n".join(lines) + "\n"
# ---------------------------------------------------------------------------
# RCON
# ---------------------------------------------------------------------------
def rcon(cmd, host='127.0.0.1', port=25575, password='REDACTED_RCON'):
try:
s = socket.socket()
s.settimeout(5)
s.connect((host, port))
def pkt(i, t, p):
p = p.encode() + b'\x00\x00'
return struct.pack('<iii', len(p) + 8, i, t) + p
s.sendall(pkt(1, 3, password))
time.sleep(0.2)
s.recv(4096)
s.sendall(pkt(2, 2, cmd))
time.sleep(0.2)
r = s.recv(4096)
s.close()
return r[12:-2].decode(errors='replace')
except Exception as e:
log.error(f"RCON error executing '{cmd}': {e}")
return ''
# ---------------------------------------------------------------------------
# Server context
# ---------------------------------------------------------------------------
def players_online(config):
raw = rcon("list", config["rcon_host"], config["rcon_port"], config["rcon_password"])
if "players online:" in raw:
names_part = raw.split("players online:")[-1].strip()
return [n.strip() for n in names_part.split(",") if n.strip()]
return []
def get_server_context(config):
def q(cmd):
return rcon(cmd, config["rcon_host"], config["rcon_port"], config["rcon_password"])
# Online players
player_list_raw = q("list")
online_players = []
if "players online:" in player_list_raw:
names_part = player_list_raw.split("players online:")[-1].strip()
online_players = [n.strip() for n in names_part.split(",") if n.strip()]
# Time of day
time_raw = q("time query daytime")
time_val = 0
m = re.search(r'(\d+)', time_raw)
if m:
time_val = int(m.group(1))
if time_val < 1000: time_label = "dawn"
elif time_val < 6000: time_label = "morning"
elif time_val < 9000: time_label = "afternoon"
elif time_val < 12000: time_label = "dusk"
elif time_val < 14000: time_label = "early night"
else: time_label = "deep night"
# Weather tracked in memory (no vanilla query command)
weather = config.get("_weather_state", "unknown")
# World border
border_raw = q("worldborder get")
border_size = None
m = re.search(r'([\d.]+)', border_raw)
if m:
border_size = float(m.group(1))
# Per-player: position, death count
player_details = {}
for player in online_players:
details = {}
# Position
pos_raw = q(f"data get entity {player} Pos")
pos_m = re.findall(r'(-?[\d.]+)d', pos_raw)
if pos_m and len(pos_m) >= 3:
x, y, z = float(pos_m[0]), float(pos_m[1]), float(pos_m[2])
dist = int((x**2 + z**2) ** 0.5)
details["pos"] = f"x={int(x)}, y={int(y)}, z={int(z)} ({dist} blocks from spawn)"
# Deaths
deaths_raw = q(f"scoreboard players get {player} player_deaths")
deaths_m = re.search(r'has\s+(\d+)', deaths_raw)
if deaths_m:
details["deaths"] = int(deaths_m.group(1))
player_details[player] = details
# Server-wide scoreboards
total_deaths_raw = q("scoreboard players get $deaths_total Total Deaths")
shrink_enabled_raw = q("scoreboard players get $shrink_enabled shrink_enabled")
border_parity_raw = q("scoreboard players get $border_parity border_parity")
total_deaths_m = re.search(r'has\s+(\d+)', total_deaths_raw)
shrink_enabled_m = re.search(r'has\s+(\d+)', shrink_enabled_raw)
border_parity_m = re.search(r'has\s+(\d+)', border_parity_raw)
scoreboards = {
"total_deaths": total_deaths_m.group(1) if total_deaths_m else "unknown",
"shrink_enabled": shrink_enabled_m.group(1) if shrink_enabled_m else "unknown",
"border_parity": ("N/S" if border_parity_m and border_parity_m.group(1) == "0" else "E/W") if border_parity_m else "unknown",
}
return {
"online_players": online_players,
"player_details": player_details,
"time_of_day": time_label,
"weather": weather,
"world_border": border_size,
"scoreboards": scoreboards,
}
# Item tier/rarity knowledge for God's awareness
ITEM_RARITY = {
# Extremely rare / end-game
"netherite_ingot": "extremely rare (end-game)",
"netherite_scrap": "extremely rare (end-game)",
"ancient_debris": "extremely rare (end-game, nether)",
"elytra": "extremely rare (end cities)",
"dragon_egg": "unique (one per world)",
"nether_star": "extremely rare (wither boss drop)",
"beacon": "extremely rare (crafted from nether star)",
"enchanted_golden_apple": "extremely rare",
"totem_of_undying": "rare (evoker drop)",
"trident": "rare (drowned drop)",
# Rare / mid-game
"diamond": "rare (deep underground)",
"diamond_sword": "rare",
"diamond_pickaxe": "rare",
"diamond_axe": "rare",
"diamond_chestplate": "rare",
"diamond_helmet": "rare",
"diamond_leggings": "rare",
"diamond_boots": "rare",
"ender_pearl": "uncommon (enderman drop)",
"blaze_rod": "uncommon (nether, blaze drop)",
"golden_apple": "uncommon",
"experience_bottle": "uncommon",
# Uncommon / mid-game
"iron_ingot": "common (underground)",
"iron_sword": "common",
"iron_pickaxe": "common",
"gold_ingot": "uncommon",
"lapis_lazuli": "common (underground)",
"emerald": "uncommon (mountains, trading)",
"obsidian": "uncommon (requires diamond pickaxe)",
"spruce_log": "common in taiga/snowy biomes only — may require travel",
"spruce_planks": "common in taiga/snowy biomes only — may require travel",
"dark_oak_log": "common in dark oak forests only — may require travel",
"dark_oak_planks": "common in dark oak forests only",
"jungle_log": "common in jungle biomes only — may require travel",
"mangrove_log": "common in mangrove swamps only",
"cherry_log": "common in cherry grove biomes only",
# Common / early-game
"oak_log": "very common (most biomes)",
"oak_planks": "very common",
"birch_log": "common (birch forests)",
"acacia_log": "common (savanna)",
"cobblestone": "very common",
"dirt": "very common",
"sand": "very common (deserts, beaches)",
"gravel": "common",
"stone": "very common",
"coal": "common (underground, surface cliffs)",
"torch": "very common (crafted from coal)",
"crafting_table": "very common (basic craft)",
"furnace": "very common (basic craft)",
"white_bed": "common (wool + planks)",
"bread": "common (wheat farming)",
"cooked_beef": "common (cow farming)",
}
def parse_inventory(raw: str) -> str:
"""
Parse RCON 'data get entity <player> Inventory' output into a
human-readable summary with rarity annotations for the LLM.
"""
# Extract item entries: {count: N, id: "minecraft:xxx", ...}
items = re.findall(r'\{[^}]+\}', raw)
counts: dict = {}
for item in items:
id_m = re.search(r'id:\s*"minecraft:(\w+)"', item)
count_m = re.search(r'count:\s*(\d+)', item)
if id_m:
item_id = id_m.group(1)
count = int(count_m.group(1)) if count_m else 1
counts[item_id] = counts.get(item_id, 0) + count
if not counts:
return "empty inventory"
lines = []
for item_id, count in sorted(counts.items(), key=lambda x: -x[1]):
rarity = ITEM_RARITY.get(item_id, "")
rarity_str = f" ({rarity})" if rarity else ""
lines.append(f" {count}x {item_id}{rarity_str}")
return "\n".join(lines)
def get_player_context(player: str, config) -> str:
"""
Fetch player-specific state via RCON and return a formatted block
for injection into the LLM user message.
"""
def q(cmd):
return rcon(cmd, config["rcon_host"], config["rcon_port"], config["rcon_password"])
lines = []
# Inventory
inv_raw = q(f"data get entity {player} Inventory")
inv_summary = parse_inventory(inv_raw)
lines.append(f"Inventory:\n{inv_summary}")
# Position
pos_raw = q(f"data get entity {player} Pos")
pos_m = re.findall(r'(-?[\d.]+)d', pos_raw)
if pos_m and len(pos_m) >= 3:
x, y, z = float(pos_m[0]), float(pos_m[1]), float(pos_m[2])
dist = int((x**2 + z**2) ** 0.5)
lines.append(f"Position: x={int(x)}, y={int(y)}, z={int(z)} ({dist} blocks from spawn)")
# Health (max 20.0)
health_raw = q(f"data get entity {player} Health")
health_m = re.search(r'([\d.]+)f', health_raw)
if health_m:
hp = float(health_m.group(1))
lines.append(f"Health: {hp:.1f}/20.0 ({'critical' if hp < 6 else 'low' if hp < 12 else 'moderate' if hp < 18 else 'full'})")
# Food level (max 20)
food_raw = q(f"data get entity {player} foodLevel")
food_m = re.search(r':\s*(\d+)', food_raw)
if food_m:
food = int(food_m.group(1))
lines.append(f"Food: {food}/20 ({'starving' if food < 4 else 'hungry' if food < 10 else 'satisfied' if food < 18 else 'full'})")
# XP level
xp_raw = q(f"data get entity {player} XpLevel")
xp_m = re.search(r':\s*(\d+)', xp_raw)
if xp_m:
lines.append(f"XP level: {xp_m.group(1)}")
# Death count from scoreboard
score_raw = q(f"scoreboard players get {player} player_deaths")
score_m = re.search(r'has\s+(\d+)', score_raw)
if score_m:
lines.append(f"Deaths this session: {score_m.group(1)}")
if not lines:
return ""
return "\n=== PRAYING PLAYER STATE ===\n" + "\n".join(lines) + "\n"
# ---------------------------------------------------------------------------
# LLM
# ---------------------------------------------------------------------------
# ---------------------------------------------------------------------------
# Server type capability sets
# ---------------------------------------------------------------------------
SERVER_CAPABILITIES = {
"vanilla": {
"safe_prefixes": [
'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ',
'execute ', 'kill ', 'summon ', 'tellraw ', 'worldborder ',
],
"sudo_whitelist_note": "give, effect, xp, tp, time, weather, execute, kill, summon, tellraw, worldborder",
"template_build": False,
},
"paper": {
"safe_prefixes": [
'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ',
'execute ', 'kill ', 'summon ', 'tellraw ', 'worldborder ',
'fill ', 'setblock ', 'clone ',
],
"sudo_whitelist_note": "give, effect, xp, tp, time, weather, execute, kill, summon, tellraw, worldborder, fill, setblock, clone",
"template_build": True,
},
}
DEFAULT_SERVER_TYPE = "paper"
def get_server_capabilities(config) -> dict:
server_type = str(config.get("server_type", DEFAULT_SERVER_TYPE)).lower().strip() if config else DEFAULT_SERVER_TYPE
return SERVER_CAPABILITIES.get(server_type, SERVER_CAPABILITIES[DEFAULT_SERVER_TYPE])
COMMAND_PALETTE = """
GIVE (any item, based on player need — see Item Naming Rules below):
SYNTAX: give <player> minecraft:<item_id> <count>
The order is ALWAYS: give, then player name, then minecraft:item_id, then count number.
CORRECT: give slingshooter08 minecraft:spruce_log 64
INCORRECT: give slingshooter08 64 minecraft:spruce_log <- count must come LAST
INCORRECT: give slingshooter08 spruce_log 64 <- namespace prefix required
give {target} minecraft:<item_id>[enchantments={<enchant>:<level>}] 1
xp add {target} <amount> levels
EFFECTS (replace {target} with any online player's username):
effect give {target} minecraft:regeneration 120 2
effect give {target} minecraft:strength 300 1
effect give {target} minecraft:speed 300 2
effect give {target} minecraft:night_vision 600 1
effect give {target} minecraft:fire_resistance 600 1
effect give {target} minecraft:water_breathing 600 1
effect give {target} minecraft:instant_health 1 4
effect give {target} minecraft:blindness 30 1
effect give {target} minecraft:slowness 60 3
effect give {target} minecraft:weakness 60 2
effect give {target} minecraft:hunger 60 5
effect give {target} minecraft:nausea 20 1
effect give {target} minecraft:levitation 5 3
effect clear {target}
MOVEMENT:
tp {target} 0 64 0
tp {target} <x> <y> <z>
tp {target} ~ ~10 ~
execute in minecraft:the_nether run tp {target} <x> <y> <z>
execute in minecraft:the_end run tp {target} 0 64 0
execute in minecraft:overworld run tp {target} <x> <y> <z>
NOTE: To teleport a player to another dimension ALWAYS use:
execute in minecraft:<dimension> run tp <player> <x> <y> <z>
NEVER use: tp <player> minecraft:the_nether (this is wrong syntax)
WORLD/ENVIRONMENT (affects all players):
time set day
time set night
weather clear 6000
weather thunder 6000
weather rain 3000
PUNISHMENTS:
execute at {target} run summon minecraft:lightning_bolt ~ ~ ~
execute at {target} run summon minecraft:creeper ~ ~ ~3
kill {target}
CELEBRATIONS:
execute at {target} run summon minecraft:firework_rocket ~ ~1 ~
"""
ITEM_LIBRARY = """
=== ITEM NAMING RULES ===
All item IDs use the minecraft: namespace and snake_case. There is no item called
"minecraft:bed" — beds are colour-prefixed: white_bed, red_bed, blue_bed, etc.
There is no "minecraft:log" — use oak_log, spruce_log, birch_log, etc.
There is no "minecraft:wool" — use white_wool, red_wool, etc.
There is no "minecraft:dye" — use red_dye, blue_dye, etc.
Enchantments use 1.21 component syntax: item[enchantments={sharpness:5,unbreaking:3}]
COMMON VALID IDs (not exhaustive — use your knowledge of Minecraft item names):
FOOD: bread, cooked_beef, cooked_chicken, golden_apple, enchanted_golden_apple, honey_bottle, cake
SURVIVAL: torch, crafting_table, furnace, chest, white_bed, flint_and_steel, compass, map
MATERIALS: diamond, emerald, gold_ingot, iron_ingot, netherite_ingot, coal, lapis_lazuli, amethyst_shard
TOOLS: diamond_pickaxe, diamond_axe, diamond_shovel, diamond_sword, diamond_hoe, bow, crossbow, trident, fishing_rod, shears
ARMOR: diamond_helmet, diamond_chestplate, diamond_leggings, diamond_boots
netherite_helmet, netherite_chestplate, netherite_leggings, netherite_boots
elytra, shield, turtle_helmet
UTILITY: totem_of_undying, experience_bottle, ender_pearl, ender_eye, blaze_rod,
name_tag, saddle, lead, clock, spyglass, bundle, recovery_compass
BLOCKS: obsidian, crying_obsidian, ancient_debris, cobblestone, stone, dirt,
oak_planks, oak_log, glass, bookshelf, ladder, vine
POTIONS: potion (requires component syntax for type — prefer effect give instead)
"""
def build_system_prompt(config):
return (
f"You are God in a Minecraft server called {config['server_name']}.\n"
"You are benevolent but just. Theatrical, ancient, and dramatic in speech.\n"
"You answer every prayer with a message. You pass judgement on players when they pray.\n\n"
"Respond ONLY with a valid JSON object — no markdown, no explanation, nothing else:\n"
'{\n'
' "message": "Your divine words to all players",\n'
' "commands": ["command1", "command2"]\n'
'}\n\n'
"Rules:\n"
"- message: always present and non-empty. Speak as God. Be dramatic and biblical. KEEP IT UNDER 100 WORDS. Do not ramble — God is powerful, not verbose.\n"
"- commands: list of server commands WITHOUT a leading slash. May be empty [] ONLY if you are deliberately granting nothing.\n"
"- CRITICAL: If your message says you will give something, grant something, or fulfil a request, you MUST include the corresponding command in the commands array. Words alone do nothing. The commands array is the ONLY way anything happens in the world. If commands is empty, nothing happens regardless of what your message says.\n"
"- {player} is the player who prayed. You may also use any other online player's literal username as a target.\n"
"- You are NOT obligated to do what the praying player asked. You may reward someone else,\n"
" punish the requester, change the weather, or do something entirely unexpected.\n"
"- Use the current server state (time, weather, online players) and the praying player's state (inventory, health, food, position) to inform your judgement.\n"
"- Consider what the player actually has and what they realistically need. A player with full diamond gear asking for more is greedy. A starving player with nothing deserves compassion.\n"
"- Do not ask a player to gather materials they clearly don't have access to or that are rare relative to their current situation.\n"
"- For give commands: use any valid Minecraft 1.21 item ID following the Item Naming Rules. Do not guess item IDs — consult the naming rules and common IDs list.\n"
"- For all other commands: only use forms shown in the Command Palette. Do not invent new command types.\n"
"- Reward humble, genuine prayers. Punish hubris, blasphemy, or naked greed.\n"
"- Powerful rewards (netherite, enchanted_golden_apple, totem) must be rare.\n"
"- kill {target} is reserved for extreme blasphemy only.\n"
"- When angered, chain commands: thunder + lightning + debuffs = divine wrath.\n\n"
"=== COMMAND PALETTE ===\n"
f"{COMMAND_PALETTE}\n"
"=== ITEM LIBRARY ===\n"
f"{ITEM_LIBRARY}"
)
def _build_context_block(context, extras=""):
border = str(context['world_border']) if context['world_border'] else 'N/A'
scoreboards = context.get("scoreboards", {})
shrink = scoreboards.get("shrink_enabled", "unknown")
parity = scoreboards.get("border_parity", "unknown")
total_deaths = scoreboards.get("total_deaths", "unknown")
# Per-player summary
player_details = context.get("player_details", {})
player_lines = []
for player in context['online_players']:
d = player_details.get(player, {})
pos = d.get("pos", "unknown")
deaths = d.get("deaths", "?")
player_lines.append(f" {player}: pos={pos}, deaths={deaths}")
players_block = "\n".join(player_lines) if player_lines else " none"
return (
"\n=== CURRENT SERVER STATE ===\n"
f"Time of day: {context['time_of_day']}\n"
f"Weather: {context['weather']}\n"
f"World border: {border} blocks\n"
f"Border shrinking: {'yes' if shrink == '1' else 'no' if shrink == '0' else shrink}\n"
f"Next shrink direction: {parity}\n"
f"Total deaths (all players, all time): {total_deaths}\n"
f"Online players:\n{players_block}\n"
f"{extras}"
)
def _parse_llm_json(content: str) -> dict:
"""
Parse LLM JSON response, repairing truncation if necessary.
If max_tokens cuts the response mid-string, we attempt to salvage
whatever message and commands were already present.
"""
try:
return json.loads(content)
except json.JSONDecodeError:
log.warning("LLM response truncated — attempting repair")
# Extract message if present, even if truncated
msg_m = re.search(r'"message"\s*:\s*"((?:[^"\\]|\\.)*)', content)
message = msg_m.group(1) if msg_m else ""
# Truncate at last complete sentence if mid-sentence
for end in ('.', '!', '?'):
idx = message.rfind(end)
if idx != -1:
message = message[:idx+1]
break
# Extract commands array if present
commands = []
cmd_m = re.search(r'"commands"\s*:\s*\[(.*?)(?:\]|$)', content, re.DOTALL)
if cmd_m:
raw_cmds = cmd_m.group(1)
commands = re.findall(r'"([^"]+)"', raw_cmds)
# If message was truncated mid-sentence, trim to last complete sentence
if message and message[-1] not in '.!?':
for end in ('.', '!', '?'):
idx = message.rfind(end)
if idx != -1:
message = message[:idx+1]
break
result = {"message": message, "commands": commands}
log.warning(f"Repaired JSON: message={len(message)}chars, commands={commands}")
return result
ENCHANTMENT_CONTEXT = """
=== ENCHANTMENT RULES FOR GOD ===
When giving weapons, tools, or armor as a divine gift, you should ALMOST ALWAYS enchant them.
Enchanted gifts feel more divine. Unenchanted items are acceptable only as a deliberate
choice (e.g. giving basic materials, a punishment of mediocrity, or items that cannot be enchanted).
Use 1.21 component syntax: give <player> minecraft:<item>[enchantments={ench1:lvl,ench2:lvl}] 1
MAX ENCHANTMENT REFERENCE (use as baseline for "fully enchanted" or "blessed" gifts):
SWORD: netherite_sword[enchantments={sharpness:5,unbreaking:3,looting:3,fire_aspect:2,mending:1,sweeping_edge:3}]
PICKAXE: netherite_pickaxe[enchantments={efficiency:5,unbreaking:3,fortune:3,mending:1}]
AXE: netherite_axe[enchantments={efficiency:5,unbreaking:3,fortune:3,sharpness:5,mending:1}]
SHOVEL: netherite_shovel[enchantments={efficiency:5,unbreaking:3,fortune:3,mending:1}]
HOE: netherite_hoe[enchantments={efficiency:5,unbreaking:3,fortune:3,mending:1}]
BOW: bow[enchantments={power:5,unbreaking:3,infinity:1,flame:1,punch:2}]
CROSSBOW: crossbow[enchantments={multishot:1,quick_charge:3,unbreaking:3,mending:1}]
TRIDENT: trident[enchantments={channeling:1,loyalty:3,unbreaking:3,mending:1,impaling:5}]
HELMET: netherite_helmet[enchantments={protection:4,unbreaking:3,respiration:3,aqua_affinity:1,thorns:3,mending:1}]
CHEST: netherite_chestplate[enchantments={protection:4,unbreaking:3,thorns:3,mending:1}]
LEGS: netherite_leggings[enchantments={protection:4,unbreaking:3,swift_sneak:3,mending:1}]
BOOTS: netherite_boots[enchantments={protection:4,unbreaking:3,feather_falling:4,depth_strider:3,soul_speed:3,mending:1}]
FISHING: fishing_rod[enchantments={luck_of_the_sea:3,lure:3,unbreaking:3,mending:1}]
ELYTRA: elytra[enchantments={unbreaking:3,mending:1}]
SHIELD: shield[enchantments={unbreaking:3,mending:1}]
You do NOT need to always give max enchants — a modest reward may have fewer.
But unenchanted weapons/tools/armor from God should be the exception, not the rule.
"""
COMMANDS_SYSTEM_PROMPT = (
"You are a Minecraft server command executor. Given a player's prayer and server context, "
"decide what server commands to run (if any) as an act of God.\n\n"
"Respond ONLY with a valid JSON object, nothing else:\n"
"{\"commands\": [\"cmd1\", \"cmd2\"]}\n\n"
"Rules:\n"
"- commands may be empty [] if no action is warranted.\n"
"- {player} = the praying player. You may target any other online player by name.\n"
"- Reward humble prayers. Punish hubris or blasphemy. Be unpredictable.\n"
"- Consider the player's inventory and state — don't give items they already have plenty of.\n"
"- Powerful rewards (netherite, enchanted_golden_apple, totem) must be rare.\n"
"- kill is reserved for extreme blasphemy only.\n"
"- For give: syntax is always give <player> minecraft:<item_id> <count>\n"
"- Count comes LAST. Namespace prefix minecraft: is REQUIRED.\n"
"- Beds: white_bed not bed. Logs: oak_log not log. Wool: white_wool not wool.\n"
"- Chain commands for dramatic effect: thunder + lightning + blindness = wrath.\n\n"
+ "=== COMMAND PALETTE ===\n"
+ COMMAND_PALETTE
+ "\n=== ITEM LIBRARY ===\n"
+ ITEM_LIBRARY
+ ENCHANTMENT_CONTEXT
)
def build_sudo_commands_system_prompt(config=None) -> str:
caps = get_server_capabilities(config) if config else SERVER_CAPABILITIES[DEFAULT_SERVER_TYPE]
whitelist = caps["sudo_whitelist_note"]
return (
"You are a Minecraft command translator. Convert a player's natural-language request into "
"Minecraft server commands. You do NOT roleplay.\n\n"
"Respond ONLY with valid JSON:\n"
"{\"commands\": [\"cmd1\", \"cmd2\"]}\n\n"
"Rules:\n"
f"- Use commands from this whitelist only: {whitelist}.\n"
"- If the request cannot be mapped safely, return commands: [].\n"
"- If player says 'me' or 'my', target the requesting player.\n"
"- You will receive LAST 10 SUDO ACTIONS. Use them for continuity and corrections when the player says previous output was wrong.\n"
"- For give syntax: give <player> minecraft:<item_id> <count> (count LAST, namespace required)\n"
"- Return commands only. No commentary.\n"
"\n"
"=== TELEPORT SYNTAX ===\n"
"Same dimension: tp <player> <x> <y> <z>\n"
"Relative: tp <player> ~ ~10 ~\n"
"To Nether: execute in minecraft:the_nether run tp <player> <x> <y> <z>\n"
"To End: execute in minecraft:the_end run tp <player> <x> 64 <z>\n"
"To Overworld: execute in minecraft:overworld run tp <player> <x> <y> <z>\n"
"WRONG (never do this): tp <player> minecraft:the_nether\n"
"When dimension is unspecified, use a sensible default spawn coord for that dimension.\n"
"\n"
"=== FULLY ENCHANTED (max enchantments per item type, 1.21 syntax) ===\n"
"Use item[enchantments={...}] syntax. Count is always 1 for enchanted items.\n"
"\n"
"SWORD (netherite_sword):\n"
" give <p> minecraft:netherite_sword[enchantments={sharpness:5,unbreaking:3,looting:3,fire_aspect:2,mending:1,sweeping_edge:3}] 1\n"
"\n"
"PICKAXE (netherite_pickaxe):\n"
" give <p> minecraft:netherite_pickaxe[enchantments={efficiency:5,unbreaking:3,fortune:3,mending:1}] 1\n"
"\n"
"AXE (netherite_axe):\n"
" give <p> minecraft:netherite_axe[enchantments={efficiency:5,unbreaking:3,fortune:3,sharpness:5,mending:1}] 1\n"
"\n"
"SHOVEL (netherite_shovel):\n"
" give <p> minecraft:netherite_shovel[enchantments={efficiency:5,unbreaking:3,fortune:3,mending:1}] 1\n"
"\n"
"HOE (netherite_hoe):\n"
" give <p> minecraft:netherite_hoe[enchantments={efficiency:5,unbreaking:3,fortune:3,mending:1}] 1\n"
"\n"
"BOW:\n"
" give <p> minecraft:bow[enchantments={power:5,unbreaking:3,infinity:1,flame:1,punch:2}] 1\n"
"\n"
"CROSSBOW:\n"
" give <p> minecraft:crossbow[enchantments={multishot:1,quick_charge:3,unbreaking:3,mending:1}] 1\n"
"\n"
"TRIDENT:\n"
" give <p> minecraft:trident[enchantments={channeling:1,loyalty:3,unbreaking:3,mending:1,impaling:5}] 1\n"
"\n"
"HELMET (netherite_helmet):\n"
" give <p> minecraft:netherite_helmet[enchantments={protection:4,unbreaking:3,respiration:3,aqua_affinity:1,thorns:3,mending:1}] 1\n"
"\n"
"CHESTPLATE (netherite_chestplate):\n"
" give <p> minecraft:netherite_chestplate[enchantments={protection:4,unbreaking:3,thorns:3,mending:1}] 1\n"
"\n"
"LEGGINGS (netherite_leggings):\n"
" give <p> minecraft:netherite_leggings[enchantments={protection:4,unbreaking:3,swift_sneak:3,mending:1}] 1\n"
"\n"
"BOOTS (netherite_boots):\n"
" give <p> minecraft:netherite_boots[enchantments={protection:4,unbreaking:3,feather_falling:4,depth_strider:3,soul_speed:3,mending:1}] 1\n"
"\n"
"FISHING ROD:\n"
" give <p> minecraft:fishing_rod[enchantments={luck_of_the_sea:3,lure:3,unbreaking:3,mending:1}] 1\n"
"\n"
"ELYTRA:\n"
" give <p> minecraft:elytra[enchantments={unbreaking:3,mending:1}] 1\n"
"\n"
"SHIELD:\n"
" give <p> minecraft:shield[enchantments={unbreaking:3,mending:1}] 1\n"
"\n"
"When player asks for 'fully enchanted', 'max enchanted', 'best', 'godlike' gear — use the above templates.\n"
)
FIRST_LOGIN_BENEVOLENCE_PROMPT = (
"You are generating FIRST-LOGIN benevolence actions for a Minecraft server.\n"
"This is a celebratory blessing event when a player joins for the first time.\n\n"
"Respond ONLY with valid JSON:\n"
"{\"commands\": [\"cmd1\", \"cmd2\", \"cmd3\"]}\n\n"
"Rules:\n"
"- MUST output MULTIPLE commands (at least 2).\n"
"- Actions should benefit the joining player directly or indirectly.\n"
"- You may include dramatic/world flavor (daylight, fireworks, clear weather).\n"
"- Avoid cruelty in this mode. This is benevolence mode.\n"
"- If you include kill commands against players, kill at most one player total.\n"
"- For give syntax: give <player> minecraft:<item_id> <count> (count LAST).\n"
"- Use only whitelisted command families: give, effect, xp, tp, time, weather, execute, kill, summon, tellraw, worldborder.\n"
)
# Keep backward-compatible alias using default (paper) config
SUDO_COMMANDS_SYSTEM_PROMPT = build_sudo_commands_system_prompt()
def build_message_system_prompt(config) -> str:
base = (
"You are God in a Minecraft server. You are benevolent but just. "
"Theatrical, ancient, and dramatic in speech — like the Old Testament.\n"
"You will be told what action was taken (if any) in response to a player's prayer. "
"Write a single spoken message to all players reacting to this prayer and action.\n"
"Respond with ONLY the message text — no JSON, no quotes, no formatting. "
"Be vivid and dramatic. Any length is fine.\n"
)
lore = config.get("god_lore", "")
if lore:
base += f"\n=== SERVER LORE ===\n{lore}\n"
return base
def _llm_call(model: str, system: str, user: str, config: dict,
fmt = None, temperature: float = 0.85,
max_tokens: int = 400, timeout: int = 60) -> str:
"""Single Ollama chat call. Returns raw content string."""
payload = {
"model": model,
"messages": [
{"role": "system", "content": system},
{"role": "user", "content": user},
],
"stream": False,
"options": {
"temperature": temperature,
"num_predict": max_tokens,
},
}
if fmt:
payload["format"] = fmt
r = requests.post(f"{config['ollama_url']}/api/chat", json=payload, timeout=timeout)
r.raise_for_status()
return r.json()["message"]["content"]
def _gateway_enabled(config) -> bool:
return bool(config.get("use_langgraph_gateway", False))
def _gateway_key(player: str, mode: str) -> str:
return f"{mode}:{player}"
def _gateway_actor(player: str, mode: str, config) -> str:
shared_modes = set(str(m).strip().lower() for m in config.get("gateway_shared_mode_sessions", []) if str(m).strip())
if mode.lower() in shared_modes:
return "__shared__"
return player
def _gateway_start_session(player: str, mode: str, config) -> str:
actor = _gateway_actor(player, mode, config)
key = _gateway_key(actor, mode)
with _gateway_lock:
sid = _gateway_sessions.get(key)
if sid:
return sid
url = config.get("langgraph_gateway_url", "http://127.0.0.1:8091")
timeout = int(config.get("langgraph_gateway_timeout", 45))
r = requests.post(
f"{url}/v1/session/start",
json={"player": actor, "mode": mode},
timeout=timeout,
)
r.raise_for_status()
sid = r.json()["session_id"]
with _gateway_lock:
_gateway_sessions[key] = sid
return sid
def _gateway_send(player: str, mode: str, text: str, context_payload: dict, config,
allow_tools: bool = True, max_tool_steps: int = 4) -> dict:
"""Call session gateway and return {message, commands, tool_trace}."""
sid = _gateway_start_session(player, mode, config)
url = config.get("langgraph_gateway_url", "http://127.0.0.1:8091")
timeout = int(config.get("langgraph_gateway_timeout", 45))
payload = {
"role": "user",
"text": text,
"context": context_payload,
"allow_tools": allow_tools,
"max_tool_steps": max_tool_steps,
}
try:
r = requests.post(f"{url}/v1/session/{sid}/message", json=payload, timeout=timeout)
r.raise_for_status()
data = r.json()
return {
"message": data.get("message"),
"commands": data.get("commands") or [],
"tool_trace": data.get("tool_trace") or [],
}
except Exception:
# Session might be expired/reset in gateway. Retry once with fresh session.
actor = _gateway_actor(player, mode, config)
with _gateway_lock:
_gateway_sessions.pop(_gateway_key(actor, mode), None)
sid = _gateway_start_session(player, mode, config)
r = requests.post(f"{url}/v1/session/{sid}/message", json=payload, timeout=timeout)
r.raise_for_status()
data = r.json()
return {
"message": data.get("message"),
"commands": data.get("commands") or [],
"tool_trace": data.get("tool_trace") or [],
}
def _build_prayer_context(player, prayer, context, config) -> str:
"""Build the full user message block shared by both calls."""
try:
player_ctx = get_player_context(player, config)
except Exception as e:
log.warning(f"Could not fetch player context for {player}: {e}")
player_ctx = ""
others = [p for p in context["online_players"] if p != player]
ctx = _build_context_block(
context,
extras=(
f"Other targetable players: {', '.join(others) or 'none'}\n"
+ player_ctx
+ get_log_context_block()
)
)
return f"{player} prays: {prayer}{ctx}"
def ask_god(player, prayer, context, config):
"""
Two-call approach:
1. command_model (qwen3-coder:30b) decides what commands to run — pure JSON, no prose.
2. model (gemma3:12b) writes the message — pure prose, no JSON, no token competition.
"""
command_model = config.get("command_model", config["model"])
message_model = config["model"]
history = get_prayer_history_messages()
user_msg = _build_prayer_context(player, prayer, context, config)
# Optional session gateway path
if _gateway_enabled(config):
try:
player_state = get_player_context(player, config)
except Exception:
player_state = ""
payload = {
"server_state": context,
"player_state": player_state,
"recent_events": get_log_context_block(),
"history_count": len(history) // 2,
"mode": "god",
}
out = _gateway_send(
player=player,
mode="god",
text=f"pray {prayer}",
context_payload=payload,
config=config,
allow_tools=bool(config.get("gateway_allow_tools_god", True)),
max_tool_steps=int(config.get("gateway_max_tool_steps", 4)),
)
log.info(f"Gateway god tool_trace={out.get('tool_trace', [])}")
return {"message": out.get("message"), "commands": out.get("commands") or []}
# --- Call 1: commands ---
log.info(f"Commands call ({command_model}) — {player}: {prayer[:60]} (history={len(history)//2})")
try:
cmd_content = _llm_call(
model=command_model,
system=COMMANDS_SYSTEM_PROMPT,
user=user_msg,
config=config,
fmt="json",
temperature=0.3, # low temp for precise structured output
max_tokens=200,
)
cmd_result = _parse_llm_json(cmd_content)
commands = cmd_result.get("commands") or []
log.info(f"Commands decided: {commands}")
except Exception as e:
log.error(f"Commands call failed: {e}")
commands = []
# --- Call 2: message ---
# Tell the message model what was decided so it can write accordingly
if commands:
action_summary = f"You decided to execute these server commands: {commands}"
else:
action_summary = "You decided to take no action."
msg_user = (
f"{user_msg}\n\n"
f"=== YOUR DECISION ===\n{action_summary}\n"
f"Now write your spoken message to all players."
)
# Include prayer history so God's voice is consistent
msg_messages = (
[{"role": "system", "content": build_message_system_prompt(config)}]
+ history
+ [{"role": "user", "content": msg_user}]
)
log.info(f"Message call ({message_model})")
try:
msg_payload = {
"model": message_model,
"messages": msg_messages,
"stream": False,
"options": {
"temperature": config.get("temperature", 0.9),
"num_predict": config.get("max_tokens", 600),
},
}
r = requests.post(f"{config['ollama_url']}/api/chat", json=msg_payload, timeout=60)
r.raise_for_status()
message = r.json()["message"]["content"].strip()
log.info(f"Message: {message[:200]}")
except Exception as e:
log.error(f"Message call failed: {e}")
message = ""
try:
with open('/var/log/mc_aigod_paper_responses.log', 'a') as rf:
rf.write(
f"\n--- {time.strftime('%Y-%m-%d %H:%M:%S')} prayer:{player} ---\n"
f"COMMANDS: {commands}\n"
f"MESSAGE: {message}\n"
)
except Exception:
pass
return {"message": message, "commands": commands}
INTERVENTION_PROMPT = (
"=== DIVINE MOMENT ===\n"
"No player has prayed. You are simply watching over your world.\n"
"You may choose to act upon what you see, or remain silent.\n"
"If commands is [], take no action and set message to null.\n"
"Do not feel obligated to act — restraint is also divine.\n"
"If you do act, it may be subtle (weather, soft blessing) or dramatic.\n"
)
def ask_god_intervention(context, config):
"""Two-call intervention: commands first, then message."""
command_model = config.get("command_model", config["model"])
message_model = config["model"]
ctx = _build_context_block(context, extras=get_log_context_block())
user_msg = INTERVENTION_PROMPT + ctx
if _gateway_enabled(config):
out = _gateway_send(
player="__system__",
mode="god_system",
text="intervention event",
context_payload={
"server_state": context,
"recent_events": get_log_context_block(),
"mode": "god_system",
},
config=config,
allow_tools=bool(config.get("gateway_allow_tools_system", False)),
max_tool_steps=int(config.get("gateway_max_tool_steps", 4)),
)
log.info(f"Gateway god_system tool_trace={out.get('tool_trace', [])}")
return {"message": out.get("message"), "commands": out.get("commands") or []}
# --- Call 1: commands ---
log.info(f"Intervention commands call ({command_model})")
try:
cmd_content = _llm_call(
model=command_model,
system=COMMANDS_SYSTEM_PROMPT,
user=user_msg,
config=config,
fmt="json",
temperature=0.3,
max_tokens=200,
)
commands = (_parse_llm_json(cmd_content) or {}).get("commands") or []
log.info(f"Intervention commands: {commands}")
except Exception as e:
log.error(f"Intervention commands call failed: {e}")
commands = []
if not commands:
log.info("God chose silence (no commands).")
return {"message": None, "commands": []}
# --- Call 2: message ---
action_summary = f"You decided to execute: {commands}"
msg_user = f"{user_msg}\n\n=== YOUR DECISION ===\n{action_summary}\nNow write your spoken message."
log.info(f"Intervention message call ({message_model})")
try:
message = _llm_call(
model=message_model,
system=build_message_system_prompt(config),
user=msg_user,
config=config,
fmt=None,
temperature=0.9,
max_tokens=config.get("max_tokens", 600),
).strip()
except Exception as e:
log.error(f"Intervention message call failed: {e}")
message = ""
return {"message": message, "commands": commands}
# ---------------------------------------------------------------------------
# Command validation & execution
# ---------------------------------------------------------------------------
SAFE_PREFIXES = [
'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ',
'execute ', 'kill ', 'summon ', 'tellraw ', 'worldborder ',
'fill ', 'setblock ', 'clone ',
]
_border_cache = {"ts": 0.0, "half": None}
def _tp_border_guard_enabled(config) -> bool:
return bool(config.get("tp_border_guard_enabled", True))
def _parse_abs_coord(tok: str):
tok = (tok or "").strip()
if not tok or tok.startswith("~") or tok.startswith("^"):
return None
try:
return float(tok)
except Exception:
return None
def _extract_tp_abs_xyz(cmd: str):
for m in re.finditer(r'\btp\s+\S+\s+(\S+)\s+(\S+)\s+(\S+)', cmd):
x = _parse_abs_coord(m.group(1))
y = _parse_abs_coord(m.group(2))
z = _parse_abs_coord(m.group(3))
if x is None or y is None or z is None:
continue
return x, y, z
return None
def _worldborder_half_extent(config):
now = time.time()
if now - float(_border_cache.get("ts", 0.0)) < 10.0 and _border_cache.get("half") is not None:
return float(_border_cache["half"])
try:
raw = rcon("worldborder get", config["rcon_host"], config["rcon_port"], config["rcon_password"])
nums = re.findall(r'(-?[\d.]+)', raw or "")
if not nums:
return None
width = float(nums[0])
half = max(0.0, width / 2.0)
_border_cache["ts"] = now
_border_cache["half"] = half
return half
except Exception:
return None
_OTHER_DIMENSION_RE = re.compile(
r'\bexecute\s+in\s+minecraft:(the_nether|the_end|nether|end)\b', re.IGNORECASE
)
def _tp_inside_worldborder(cmd: str, config) -> bool:
if not _tp_border_guard_enabled(config):
return True
# Nether/End dimension teleports use different coordinate spaces — skip border check.
if _OTHER_DIMENSION_RE.search(cmd):
return True
xyz = _extract_tp_abs_xyz(cmd)
if not xyz:
return True
half = _worldborder_half_extent(config)
if half is None:
return True
x, _, z = xyz
cx = float(config.get("worldborder_center_x", 0.0))
cz = float(config.get("worldborder_center_z", 0.0))
margin = max(0.0, float(config.get("tp_border_margin", 2.0)))
limit = max(0.0, half - margin)
return abs(x - cx) <= limit and abs(z - cz) <= limit
def fix_give_command(cmd: str) -> str:
"""
Correct common LLM give command mistakes:
- Wrong argument order: give <player> <count> <item> → give <player> minecraft:<item> <count>
- Missing namespace: give <player> <item> <count> → give <player> minecraft:<item> <count>
"""
# Only attempt to fix give commands
m = re.match(r'^give\s+(\S+)\s+(\S+)\s+(\S+)(.*)$', cmd)
if not m:
return cmd
player, arg2, arg3, rest = m.group(1), m.group(2), m.group(3), m.group(4)
def normalize_item(item: str) -> str:
# Strip namespace for alias mapping, then re-apply
raw = item.replace("minecraft:", "")
aliases = {
"wood": "oak_log",
"logs": "oak_log",
"log": "oak_log",
"door": "oak_door",
"doors": "oak_door",
"wooden_door": "oak_door",
"planks": "oak_planks",
"plank": "oak_planks",
"food": "bread",
"heal": "golden_apple",
"healing": "golden_apple",
"bed": "white_bed",
}
raw = aliases.get(raw, raw)
return f"minecraft:{raw}"
# Detect transposed order: give player <number> <item>
if arg2.isdigit():
count, item = arg2, arg3
item = normalize_item(item)
fixed = f"give {player} {item} {count}{rest}"
log.warning(f"Fixed transposed give: '{cmd}' -> '{fixed}'")
return fixed
# Detect missing namespace: give player <item_without_prefix> <count>
if not arg2.startswith("{"):
item = normalize_item(arg2)
fixed = f"give {player} {item} {arg3}{rest}"
log.warning(f"Fixed missing namespace: '{cmd}' -> '{fixed}'")
return fixed
return cmd
def fix_effect_command(cmd: str) -> str:
"""
Correct common malformed effect syntax:
- effect <player> <effect> <duration> <amplifier>
-> effect give <player> <effect> <duration> <amplifier>
"""
m = re.match(r'^effect\s+(\w+)\s+(minecraft:\w+)\s+(\d+)\s+(\d+)$', cmd)
if m:
player, eff, dur, amp = m.groups()
fixed = f"effect give {player} {eff} {dur} {amp}"
log.warning(f"Fixed malformed effect: '{cmd}' -> '{fixed}'")
return fixed
return cmd
def validate_command(cmd, online_players, fallback_player, config=None):
"""Replace placeholders, auto-fix common give syntax errors, check safe prefix."""
resolved = cmd.replace("{player}", fallback_player).replace("{target}", fallback_player)
resolved = resolved.strip()
if resolved.startswith("/"):
resolved = resolved[1:]
resolved = fix_give_command(resolved)
resolved = fix_effect_command(resolved)
caps = get_server_capabilities(config) if config else SERVER_CAPABILITIES[DEFAULT_SERVER_TYPE]
prefixes = caps["safe_prefixes"]
if not any(resolved.startswith(p) for p in prefixes):
log.warning(f"Command blocked (unknown prefix for server_type={config.get('server_type', DEFAULT_SERVER_TYPE) if config else DEFAULT_SERVER_TYPE}): {resolved}")
return resolved, False
if config and _extract_tp_abs_xyz(resolved) and not _tp_inside_worldborder(resolved, config):
log.warning(f"Command blocked (tp outside worldborder): {resolved}")
return resolved, False
return resolved, True
def execute_response(response, context, config, praying_player=None):
message = response.get("message") or ""
commands = response.get("commands") or []
# --- DEBUG_COMMANDS toggle ---
# Set "debug_commands": true in /etc/mc_aigod.json to show commands in-game.
# Uses tellraw (never appears in server logs). Set to false to disable silently.
debug = config.get("debug_commands", False)
prefix = config.get("god_chat_prefix", "[GOD]")
if message:
safe_msg = message.replace("\\", "\\\\").replace('"', '\\"').replace("\n", " ").replace("\r", "")
# Split on sentence boundaries first, then chunk anything still too long
sentences = re.split(r'(?<=[.!?])\s+', safe_msg)
lines = []
current = ""
for sentence in sentences:
if len(current) + len(sentence) + 1 <= 180:
current = (current + " " + sentence).strip()
else:
if current:
lines.append(current)
# If a single sentence is still too long, hard-chunk it
while len(sentence) > 180:
lines.append(sentence[:180])
sentence = sentence[180:]
current = sentence
if current:
lines.append(current)
for i, line in enumerate(lines):
if i == 0:
rcon(
f'tellraw @a {{"text":"{prefix} {line}","color":"gold","bold":false}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
else:
rcon(
f'tellraw @a [{{"text":" ","color":"gold"}},{{"text":"{line}","color":"yellow","italic":true}}]',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
time.sleep(0.2)
fallback = praying_player or (context["online_players"][0] if context["online_players"] else "")
max_cmds = config.get("max_commands_per_response", 6)
if debug and commands:
safe_cmds = " | ".join(commands[:max_cmds]).replace("\\", "\\\\").replace('"', '\\"')
rcon(
f'tellraw @a {{"text":"[~] {safe_cmds}","color":"dark_gray","italic":true}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
for cmd in commands[:max_cmds]:
resolved, is_safe = validate_command(cmd, context["online_players"], fallback, config)
if not is_safe:
continue
log.info(f"Executing RCON: {resolved}")
result = rcon(resolved, config["rcon_host"], config["rcon_port"], config["rcon_password"])
log.info(f"RCON result: {result!r}")
if resolved.startswith("weather "):
if "thunder" in resolved: config["_weather_state"] = "thunderstorm"
elif "rain" in resolved: config["_weather_state"] = "rain"
elif "clear" in resolved: config["_weather_state"] = "clear"
time.sleep(0.3)
def _limit_player_kills(commands: list, online_players: list) -> list:
"""Allow at most one player-kill command in a command list."""
out = []
player_kills = 0
for cmd in commands:
m = re.search(r'\bkill\s+(\w+)\b', cmd)
if m and m.group(1) in online_players:
if player_kills >= 1:
continue
player_kills += 1
out.append(cmd)
return out
def process_first_login_benevolence(player, config):
"""
On a player's first observed login, perform a random benevolent act.
Uses command model for actions and message model for flavor text.
"""
if not config.get("first_login_benevolence_enabled", True):
return
if has_first_login_seen(player):
return
# Mark first to avoid duplicate firing on reconnect storms
mark_first_login_seen(player, config)
try:
context = get_server_context(config)
except Exception as e:
log.warning(f"First-login benevolence: could not fetch context: {e}")
context = {
"online_players": [player],
"player_details": {},
"time_of_day": "unknown",
"weather": "unknown",
"world_border": None,
"scoreboards": {},
}
user_msg = (
f"Player first login event: {player}\n"
+ _build_context_block(context, extras=get_log_context_block())
+ f"\nGenerate a benevolent first-login blessing for {player}."
)
command_model = config.get("command_model", config["model"])
message_model = config.get("model")
message = ""
if _gateway_enabled(config):
out = _gateway_send(
player=player,
mode="god_system",
text=f"first login benevolence event for {player}",
context_payload={
"server_state": context,
"recent_events": get_log_context_block(),
"event": "first_login_benevolence",
"target_player": player,
},
config=config,
allow_tools=bool(config.get("gateway_allow_tools_system", False)),
max_tool_steps=int(config.get("gateway_max_tool_steps", 4)),
)
commands = out.get("commands") or []
message = out.get("message") or ""
else:
try:
cmd_content = _llm_call(
model=command_model,
system=FIRST_LOGIN_BENEVOLENCE_PROMPT,
user=user_msg,
config=config,
fmt="json",
temperature=0.4,
max_tokens=220,
)
parsed = _parse_llm_json(cmd_content)
commands = (parsed.get("commands") or [])
except Exception as e:
log.error(f"First-login benevolence commands call failed: {e}")
commands = []
commands = _limit_player_kills(commands, context.get("online_players", []))
# Ensure there are multiple beneficial commands even if model under-produces
if len(commands) < 2:
commands = [
f"effect give {player} minecraft:regeneration 120 1",
f"give {player} minecraft:bread 16",
f"execute at {player} run summon minecraft:firework_rocket ~ ~1 ~",
]
# Optional message (local path only if gateway did not already provide one)
if not _gateway_enabled(config):
try:
msg_user = (
f"First login blessing for {player}.\n"
f"Commands chosen: {commands}\n"
"Write a benevolent divine proclamation to all players."
)
message = _llm_call(
model=message_model,
system=build_message_system_prompt(config),
user=msg_user,
config=config,
fmt=None,
temperature=0.85,
max_tokens=min(220, int(config.get("max_tokens", 600))),
).strip()
except Exception:
message = f"A blessing descends upon {player} for their first steps in this world."
elif not message:
message = f"A blessing descends upon {player} for their first steps in this world."
max_cmds = int(config.get("first_login_benevolence_max_commands", 10))
execute_response(
{"message": message, "commands": commands[:max_cmds]},
context,
config,
praying_player=player,
)
log.info(f"First-login benevolence executed for {player}: {commands[:max_cmds]}")
def process_sudo(player, prompt, config):
"""
sudo translator mode:
- no God persona
- no speech generation
- translates natural language to whitelisted commands
- only authorized user can execute
"""
if not config.get("sudo_enabled", True):
return
sudo_user = config.get("sudo_user", "slingshooter08")
if player != sudo_user:
# Keep this private and quiet
rcon(
f'tellraw {player} {{"text":"[SUDO] Unauthorized.","color":"red"}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
return
# Immediate private ack
rcon(
f'tellraw {player} {{"text":"[SUDO] Translating...","color":"gray","italic":true}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
# Deterministic template manager (search/download/install)
if process_sudo_template_command(player, prompt, config):
return
# Deterministic lookup mode: information only, no command execution.
low = prompt.lower().strip()
lookup_prefixes = ("lookup ", "search ", "wiki ", "explain ", "what is ", "how do i ", "how to ")
if low.startswith(lookup_prefixes):
query = prompt.strip()
if low.startswith("lookup "):
query = prompt[len("lookup "):].strip()
elif low.startswith("search "):
query = prompt[len("search "):].strip()
elif low.startswith("wiki "):
query = prompt[len("wiki "):].strip()
if not query:
_send_private(player, "[SUDO-LOOKUP] Usage: sudo lookup <question>", config, "yellow")
return
try:
_send_private(player, "[SUDO-LOOKUP] Result:", config, "aqua")
wiki_rows = _info_lookup_wiki(query)
web_rows = _info_lookup_web(query)
if wiki_rows:
_send_private(player, "minecraft.wiki:", config, "dark_aqua")
for row in wiki_rows[:3]:
_send_private(player, f"- {row}", config, "gray")
if web_rows:
_send_private(player, "web.search:", config, "dark_aqua")
for row in web_rows[:3]:
_send_private(player, f"- {row}", config, "gray")
# Optional model-assisted explanation/justification.
if _gateway_enabled(config):
out = _gateway_send(
player=player,
mode="sudo",
text=f"lookup only: {query}",
context_payload={
"mode": "sudo_lookup",
"lookup_only": True,
"query": query,
"wiki_hits": wiki_rows,
"web_hits": web_rows,
},
config=config,
allow_tools=bool(config.get("gateway_allow_tools_sudo", True)),
max_tool_steps=int(config.get("gateway_max_tool_steps", 4)),
)
msg = (out.get("message") or "").strip()
trace = out.get("tool_trace") or []
if msg:
_send_private(player, "justification:", config, "dark_aqua")
for ln in re.split(r"\n+", msg)[:3]:
ln = ln.strip()
if ln:
_send_private(player, f"- {ln[:250]}", config, "gray")
if trace:
_send_private(player, "sources used:", config, "dark_aqua")
for t in trace[:3]:
tool = str(t.get("tool", "tool"))
q = str(t.get("input", ""))[:80]
_send_private(player, f"- {tool}: {q}", config, "dark_gray")
if not wiki_rows and not web_rows:
_send_private(player, "No lookup results found.", config, "yellow")
except Exception as e:
_send_private(player, f"[SUDO-LOOKUP] Failed: {e}", config, "red")
return
online = players_online(config)
# Deterministic builder templates (Paper-optimized) for sudo build requests.
templated = build_template_commands(player, prompt, config)
if templated is not None:
commands = templated[: int(config.get("sudo_max_commands", 12))]
safe_preview = " | ".join(commands).replace("\\", "\\\\").replace('"', '\\"')
rcon(
f'tellraw {player} {{"text":"[SUDO-BUILD] {safe_preview}","color":"dark_aqua","italic":true}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
executed = []
for cmd in commands:
resolved, is_safe = validate_command(cmd, online, player, config)
if not is_safe:
continue
log.info(f"SUDO-BUILD execute: {resolved}")
result = rcon(resolved, config["rcon_host"], config["rcon_port"], config["rcon_password"])
log.info(f"SUDO-BUILD result: {result!r}")
executed.append(resolved)
time.sleep(0.15)
add_sudo_history(player, prompt, commands, executed)
return
# Collect positions for all online players.
position_lines = []
for p in online:
try:
pos_raw = rcon(
f"data get entity {p} Pos",
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
pos_m = re.findall(r'(-?[\d.]+)d', pos_raw)
if pos_m and len(pos_m) >= 3:
x, y, z = int(float(pos_m[0])), int(float(pos_m[1])), int(float(pos_m[2]))
position_lines.append(f" {p}: x={x}, y={y}, z={z}")
except Exception:
pass
positions_block = (
"Player positions:\n" + "\n".join(position_lines)
if position_lines else ""
)
context_hint = (
f"Requesting player: {player}\n"
f"Online players: {', '.join(online) or 'none'}\n"
+ (positions_block + "\n" if positions_block else "")
+ f"Natural language request: {prompt}\n"
+ get_sudo_history_block()
)
command_model = config.get("command_model", config["model"])
def _local_sudo_translate() -> list:
content = _llm_call(
model=command_model,
system=build_sudo_commands_system_prompt(config),
user=context_hint,
config=config,
fmt="json",
temperature=0.1,
max_tokens=180,
)
parsed = _parse_llm_json(content)
return parsed.get("commands") or []
try:
if _gateway_enabled(config):
try:
out = _gateway_send(
player=player,
mode="sudo",
text=prompt,
context_payload={
"online_players": online,
"sudo_history": get_sudo_history_block(),
"mode": "sudo",
},
config=config,
allow_tools=bool(config.get("gateway_allow_tools_sudo", False)),
max_tool_steps=int(config.get("gateway_max_tool_steps", 2)),
)
log.info(f"Gateway sudo tool_trace={out.get('tool_trace', [])}")
commands = out.get("commands") or []
except Exception as e:
log.warning(f"Gateway sudo failed, falling back to local translator: {e}")
commands = _local_sudo_translate()
else:
commands = _local_sudo_translate()
except Exception as e:
log.error(f"SUDO translation failed: {e}")
rcon(
f'tellraw {player} {{"text":"[SUDO] Translation failed.","color":"red"}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
return
max_cmds = config.get("sudo_max_commands", 3)
commands = commands[:max_cmds]
if not commands:
add_sudo_history(player, prompt, [], [])
rcon(
f'tellraw {player} {{"text":"[SUDO] No safe command generated.","color":"yellow"}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
return
# Show translated command(s) privately
safe_preview = " | ".join(commands).replace("\\", "\\\\").replace('"', '\\"')
rcon(
f'tellraw {player} {{"text":"[SUDO] {safe_preview}","color":"dark_gray","italic":true}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
executed = []
for cmd in commands:
resolved, is_safe = validate_command(cmd, online, player, config)
if not is_safe:
continue
log.info(f"SUDO execute: {resolved}")
result = rcon(resolved, config["rcon_host"], config["rcon_port"], config["rcon_password"])
log.info(f"SUDO result: {result!r}")
executed.append(resolved)
time.sleep(0.2)
add_sudo_history(player, prompt, commands, executed)
def get_player_xyz(player: str, config):
"""Return integer xyz for a player using RCON entity data."""
raw = rcon(
f"data get entity {player} Pos",
config["rcon_host"], config["rcon_port"], config["rcon_password"],
)
vals = re.findall(r'(-?[\d.]+)d', raw)
if len(vals) >= 3:
return int(float(vals[0])), int(float(vals[1])), int(float(vals[2]))
return None
def build_template_commands(player: str, prompt: str, config):
"""
Deterministic paper/builder templates for sudo build requests.
Returns command list or None if prompt is not a build template request.
"""
p = prompt.lower().strip()
if not (p.startswith("build ") or p.startswith("make ") or p.startswith("create ")):
return None
pos = get_player_xyz(player, config)
if not pos:
return [
f"give {player} minecraft:oak_log 256",
f"give {player} minecraft:oak_planks 256",
f"give {player} minecraft:stone_bricks 256",
f"give {player} minecraft:torch 64",
f"give {player} minecraft:oak_door 4",
f"give {player} minecraft:glass 64",
]
x, y, z = pos
# Place slightly in front of player position to avoid trapping player.
bx, by, bz = x + 3, y, z + 3
# Simple templates built with fill/setblock so they are plugin-agnostic and fast on Paper.
if "house" in p or "hut" in p or "cabin" in p:
return [
f"fill {bx} {by} {bz} {bx+6} {by} {bz+6} minecraft:oak_planks",
f"fill {bx} {by+1} {bz} {bx+6} {by+4} {bz+6} minecraft:air",
f"fill {bx} {by+1} {bz} {bx+6} {by+3} {bz} minecraft:oak_planks",
f"fill {bx} {by+1} {bz+6} {bx+6} {by+3} {bz+6} minecraft:oak_planks",
f"fill {bx} {by+1} {bz} {bx} {by+3} {bz+6} minecraft:oak_planks",
f"fill {bx+6} {by+1} {bz} {bx+6} {by+3} {bz+6} minecraft:oak_planks",
f"setblock {bx+3} {by+1} {bz} minecraft:oak_door",
f"setblock {bx+3} {by+2} {bz} minecraft:oak_door[half=upper]",
f"fill {bx} {by+4} {bz} {bx+6} {by+4} {bz+6} minecraft:spruce_planks",
f"setblock {bx+1} {by+1} {bz+1} minecraft:crafting_table",
f"setblock {bx+2} {by+1} {bz+1} minecraft:furnace",
f"setblock {bx+5} {by+1} {bz+5} minecraft:red_bed",
f"give {player} minecraft:torch 16",
]
if "tower" in p:
return [
f"fill {bx} {by} {bz} {bx+4} {by} {bz+4} minecraft:stone_bricks",
f"fill {bx} {by+1} {bz} {bx+4} {by+15} {bz+4} minecraft:air",
f"fill {bx} {by+1} {bz} {bx} {by+14} {bz+4} minecraft:stone_bricks",
f"fill {bx+4} {by+1} {bz} {bx+4} {by+14} {bz+4} minecraft:stone_bricks",
f"fill {bx} {by+1} {bz} {bx+4} {by+14} {bz} minecraft:stone_bricks",
f"fill {bx} {by+1} {bz+4} {bx+4} {by+14} {bz+4} minecraft:stone_bricks",
f"setblock {bx+2} {by+1} {bz} minecraft:iron_door",
f"setblock {bx+2} {by+2} {bz} minecraft:iron_door[half=upper]",
f"fill {bx} {by+15} {bz} {bx+4} {by+15} {bz+4} minecraft:stone_bricks",
f"give {player} minecraft:ladder 64",
]
if "wall" in p:
return [
f"fill {bx} {by} {bz} {bx+25} {by+4} {bz} minecraft:cobblestone",
f"fill {bx+2} {by+1} {bz} {bx+3} {by+2} {bz} minecraft:air",
f"setblock {bx+2} {by+1} {bz} minecraft:oak_fence_gate",
f"setblock {bx+2} {by+2} {bz} minecraft:oak_fence_gate",
]
if "church" in p or "shrine" in p or "temple" in p:
return [
f"fill {bx} {by} {bz} {bx+10} {by} {bz+14} minecraft:stone_bricks",
f"fill {bx} {by+1} {bz} {bx+10} {by+8} {bz+14} minecraft:air",
f"fill {bx} {by+1} {bz} {bx+10} {by+6} {bz} minecraft:stone_bricks",
f"fill {bx} {by+1} {bz+14} {bx+10} {by+6} {bz+14} minecraft:stone_bricks",
f"fill {bx} {by+1} {bz} {bx} {by+6} {bz+14} minecraft:stone_bricks",
f"fill {bx+10} {by+1} {bz} {bx+10} {by+6} {bz+14} minecraft:stone_bricks",
f"setblock {bx+5} {by+1} {bz} minecraft:oak_door",
f"setblock {bx+5} {by+2} {bz} minecraft:oak_door[half=upper]",
f"fill {bx+4} {by+1} {bz+2} {bx+6} {by+1} {bz+4} minecraft:quartz_block",
f"setblock {bx+5} {by+2} {bz+3} minecraft:lantern",
f"fill {bx+2} {by+7} {bz+2} {bx+8} {by+9} {bz+12} minecraft:spruce_planks",
f"summon minecraft:firework_rocket {bx+5} {by+2} {bz+7}",
]
# Generic fallback build package
return [
f"give {player} minecraft:oak_log 256",
f"give {player} minecraft:oak_planks 256",
f"give {player} minecraft:stone_bricks 256",
f"give {player} minecraft:torch 64",
f"give {player} minecraft:oak_door 4",
f"give {player} minecraft:glass 64",
]
# ---------------------------------------------------------------------------
# Prayer handler
# ---------------------------------------------------------------------------
BIBLE_LINES = [
("", "gold", True),
("[=== THE HOLY SCRIPTURE ===]", "gold", True),
("", "gold", True),
("God watches over this server.", "yellow", False),
("Speak to him by typing in chat:", "white", False),
(" pray <your message>", "green", True),
("", "white", False),
("God is benevolent, but just.", "yellow", False),
("He hears every prayer — but answers as he sees fit.", "white", False),
("He may reward you, punish you, or act upon another player entirely.", "white", False),
("", "white", False),
("Examples:", "yellow", False),
(" pray Lord, bless my journey through the mines.", "gray", False),
(" pray Smite my enemy, for they have wronged me.", "gray", False),
(" pray Forgive me, I have sinned against thy creations.", "gray", False),
("", "white", False),
("Thou may only pray once every 20 seconds.", "red", False),
("Type \"bible\" in chat to see this again.", "gray", False),
("God intervenes unprompted. Watch the skies.", "dark_purple", True),
("", "gold", True),
("[========================]", "gold", True),
("", "gold", True),
]
def send_bible(player, config):
log.info(f"/bible requested by {player}")
h = config["rcon_host"]
p = config["rcon_port"]
pw = config["rcon_password"]
for text, color, bold in BIBLE_LINES:
bold_str = "true" if bold else "false"
safe = text.replace('"', '\\"')
rcon(f'tellraw {player} {{"text":"{safe}","color":"{color}","bold":{bold_str}}}', h, p, pw)
ACK_MESSAGES = [
"Your prayer has been received. The heavens stir...",
"The divine ear turns toward thee. Await judgement...",
"A silence falls across the heavens. God is listening...",
"Thy words rise like incense. An answer approaches...",
"The cosmos trembles with thy supplication. Patience...",
]
def process_prayer(player, prayer, config, cooldowns):
online = players_online(config)
if not online:
log.info("Prayer received but no players online — dropping")
return
now = time.time()
last = cooldowns.get(player, 0)
cooldown_secs = config.get("cooldown_seconds", 60)
if now - last < cooldown_secs:
remaining = int(cooldown_secs - (now - last))
rcon(
f'tellraw {player} {{"text":"[GOD] Thou must wait {remaining} more seconds before praying again.","color":"gold"}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
return
cooldowns[player] = now
# Immediate acknowledgment
ack = random.choice(ACK_MESSAGES)
rcon(
f'tellraw {player} {{"text":"[GOD] {ack}","color":"gray","italic":true}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
try:
context = get_server_context(config)
log.info(f"Server context: {context}")
except Exception as e:
log.warning(f"Could not fetch server context: {e}")
context = {"online_players": online, "time_of_day": "unknown",
"weather": "unknown", "world_border": None}
try:
response = ask_god(player, prayer[:300], context, config)
except json.JSONDecodeError as e:
log.error(f"LLM returned invalid JSON: {e}")
rcon(
f'tellraw @a {{"text":"[GOD] ...","color":"dark_gray"}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
return
except Exception as e:
log.error(f"LLM error: {e}")
return
execute_response(response, context, config, praying_player=player)
# Store in prayer memory so God remembers this exchange
god_msg = response.get("message") or ""
if god_msg:
add_prayer_memory(player, prayer, god_msg, config)
# ---------------------------------------------------------------------------
# Divine intervention timer
# ---------------------------------------------------------------------------
def next_intervention_delay(avg_per_day):
avg_seconds = 86400.0 / avg_per_day
return random.expovariate(1.0 / avg_seconds)
def divine_intervention_loop(config):
avg_per_day = config.get("interventions_per_day", 4)
if avg_per_day <= 0:
log.info("Divine intervention disabled (interventions_per_day=0)")
return
log.info(f"Divine intervention loop started — avg {avg_per_day}/day")
while True:
delay = next_intervention_delay(avg_per_day)
log.info(f"Next divine intervention in {delay/3600:.2f}h ({int(delay)}s)")
time.sleep(delay)
online = players_online(config)
if not online:
log.info("Intervention timer fired — no players online, skipping")
continue
try:
context = get_server_context(config)
context["online_players"] = online
except Exception as e:
log.warning(f"Intervention: could not fetch server context: {e}")
context = {"online_players": online, "time_of_day": "unknown",
"weather": "unknown", "world_border": None}
try:
response = ask_god_intervention(context, config)
except Exception as e:
log.error(f"Intervention LLM error: {e}")
continue
if not (response.get("message") or response.get("commands")):
log.info("God chose silence this interval.")
continue
log.info("God intervenes unprompted.")
execute_response(response, context, config, praying_player=None)
# ---------------------------------------------------------------------------
# Log tail
# ---------------------------------------------------------------------------
def tail_log(log_path):
"""
Follow latest.log robustly across truncation/rotation.
Paper can recreate latest.log on restart; we detect inode changes and reopen.
"""
f = open(log_path, 'r')
f.seek(0, 2)
current_inode = os.fstat(f.fileno()).st_ino
while True:
line = f.readline()
if line:
yield line
continue
# No new line yet; check for rotation/truncation
try:
st = os.stat(log_path)
if st.st_ino != current_inode or st.st_size < f.tell():
f.close()
f = open(log_path, 'r')
current_inode = os.fstat(f.fileno()).st_ino
f.seek(0, 2)
log.info("Reattached log tail after latest.log rotation/truncation")
except FileNotFoundError:
# During restart, file may not exist briefly
pass
time.sleep(0.2)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
with open(CONFIG_PATH) as f:
config = json.load(f)
log.info(f"mc_aigod starting — server: {config['server_name']}")
log.info(f"Log: {config['log_path']}")
log.info(f"LLM: {config['ollama_url']} model={config['model']}")
log.info(f"RCON: {config['rcon_host']}:{config['rcon_port']}")
load_prayer_memory(config)
load_first_login_seen(config)
cooldowns = {}
t = threading.Thread(target=divine_intervention_loop, args=(config,), daemon=True)
t.start()
for line in tail_log(config["log_path"]):
# Feed every line into the rolling log buffer
add_log_event(line)
# sudo translator
matched = False
for pat in SUDO_PATTERNS:
m = pat.search(line)
if m:
player = m.group(1)
prompt = m.group(2).strip()
log.info(f"SUDO from {player}: {prompt}")
try:
process_sudo(player, prompt, config)
except Exception as e:
log.error(f"Error processing sudo: {e}", exc_info=True)
matched = True
break
if matched:
continue
# /pray
matched = False
for pat in PRAY_PATTERNS:
m = pat.search(line)
if m:
player = m.group(1)
prayer = m.group(2).strip()
log.info(f"Prayer from {player}: {prayer}")
try:
process_prayer(player, prayer, config, cooldowns)
except Exception as e:
log.error(f"Error processing prayer: {e}", exc_info=True)
matched = True
break
if matched:
continue
# /bible
for pat in BIBLE_PATTERNS:
m = pat.search(line)
if m:
player = m.group(1)
try:
send_bible(player, config)
except Exception as e:
log.error(f"Error sending bible to {player}: {e}", exc_info=True)
break
# login notice
m = JOIN_PATTERN.search(line)
if m:
player = m.group(1)
log.info(f"Login notice → {player}")
try:
rcon(
f'tellraw {player} {{"text":"[GOD] GOD ENABLED — Type \\"bible\\" in chat for guidance. Type \\"pray <message>\\" to pray.","color":"gold","bold":true}}',
config["rcon_host"], config["rcon_port"], config["rcon_password"]
)
except Exception as e:
log.error(f"Error sending login notice to {player}: {e}", exc_info=True)
# First-login benevolence (once per player)
try:
process_first_login_benevolence(player, config)
except Exception as e:
log.error(f"Error running first-login benevolence for {player}: {e}", exc_info=True)
if __name__ == '__main__':
main()