Files
minecraft-ai-god-paper-fork/langgraph_gateway.py
T
Claude Code f243384d4e Semver rename + single-call gateway
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 21:37:37 -04:00

1204 lines
42 KiB
Python

#!/usr/bin/env python3
"""
langgraph_gateway.py
Session-based LLM gateway sidecar for Minecraft AI.
Provides:
- per-player sessions
- bounded tool loop (web.search, minecraft.wiki_lookup)
- final {message, commands, tool_trace} payload
This is intentionally lightweight and API-first.
Execution safety remains in mc_aigod_paper.py.
"""
import json
import hashlib
import logging
import os
import re
import socket
import sqlite3
import struct
import threading
import time
import uuid
from pathlib import Path
from urllib.parse import urlparse
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import requests
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [gateway] %(levelname)s: %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('/var/log/mc_langgraph_gateway.log'),
]
)
log = logging.getLogger(__name__)
CONFIG_PATH = '/etc/mc_langgraph_gateway.json'
class StartSessionRequest(BaseModel):
player: str
mode: str = Field(pattern='^(god|sudo|god_system)$')
class StartSessionResponse(BaseModel):
session_id: str
class MessageRequest(BaseModel):
role: str = Field(default='user')
text: str
context: Dict[str, Any] = Field(default_factory=dict)
allow_tools: bool = True
max_tool_steps: int = 4
class MessageResponse(BaseModel):
message: Optional[str] = None
commands: List[str] = Field(default_factory=list)
tool_trace: List[Dict[str, Any]] = Field(default_factory=list)
@dataclass
class SessionState:
session_id: str
player: str
mode: str
created_at: float = field(default_factory=time.time)
updated_at: float = field(default_factory=time.time)
messages: List[Dict[str, str]] = field(default_factory=list)
_sessions: Dict[str, SessionState] = {}
_sessions_lock = threading.Lock()
_kb_lock = threading.Lock()
_kb_index_cache: Dict[str, Any] = {'loaded_at': 0.0, 'docs': []}
_KB_ALLOWED_EXTS = {'.md', '.txt', '.json'}
# ---------------------------------------------------------------------------
# RCON world observation helper
# ---------------------------------------------------------------------------
def _rcon_query(cmd: str, host: str = '127.0.0.1', port: int = 25577,
password: str = 'REDACTED_RCON', timeout: float = 5.0) -> str:
"""Send a single RCON command and return the response text."""
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(timeout)
try:
s.connect((host, int(port)))
def pkt(req_id: int, pkt_type: int, payload: str) -> bytes:
p = payload.encode('utf-8') + b'\x00\x00'
return struct.pack('<iii', len(p) + 8, req_id, pkt_type) + p
# Authenticate (type 3)
s.sendall(pkt(1, 3, password))
time.sleep(0.15)
s.recv(4096)
# Send command (type 2)
s.sendall(pkt(2, 2, cmd))
time.sleep(0.2)
r = s.recv(4096)
return r[12:-2].decode('utf-8', errors='replace')
except Exception as e:
return f'RCON error: {e}'
finally:
s.close()
def _rcon_cfg() -> Dict[str, Any]:
"""Return RCON connection params from config."""
return {
'host': str(CFG.get('rcon_host', '127.0.0.1')),
'port': int(CFG.get('rcon_port', 25577)),
'password': str(CFG.get('rcon_password', 'REDACTED_RCON')),
}
COMMAND_PREFIXES_BY_MODE = {
'sudo': [
'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ', 'execute ',
'kill ', 'summon ', 'tellraw ', 'worldborder ', 'fill ', 'setblock ',
'clone ', 'gamemode ', 'template ',
],
'god': [
'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ', 'execute ',
'kill ', 'summon ', 'tellraw ', 'worldborder ',
],
'god_system': [
'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ', 'execute ',
'kill ', 'summon ', 'tellraw ', 'worldborder ',
],
}
def load_config() -> Dict[str, Any]:
try:
with open(CONFIG_PATH) as f:
return json.load(f)
except FileNotFoundError:
log.warning('Config not found, using defaults')
return {
'ollama_url': 'http://127.0.0.1:11434',
'message_model': 'gemma3:12b',
'command_model': 'qwen3-coder:30b',
'tool_model': 'qwen2.5:1.5b',
'session_ttl_seconds': 21600,
'knowledge_base_dir': '/var/lib/mc-langgraph-gateway/knowledge',
'knowledge_index_file': '/var/lib/mc-langgraph-gateway/knowledge/index.json',
'knowledge_auto_index_on_start': True,
'knowledge_bootstrap_on_start': True,
'knowledge_bootstrap_urls': [
'https://minecraft.wiki/w/Commands/fill',
'https://minecraft.wiki/w/Commands/setblock',
'https://minecraft.wiki/w/Commands/clone',
'https://minecraft.wiki/w/Commands/summon',
'https://minecraft.wiki/w/Commands/execute',
'https://minecraft.wiki/w/TNT',
'https://minecraft.wiki/w/Explosion',
'https://minecraft.wiki/w/Tutorial:Worldedit',
],
'knowledge_max_doc_bytes': 200000,
}
CFG = load_config()
DB_PATH = CFG.get('session_db_path', '/var/lib/mc-langgraph-gateway/sessions.db')
_db_lock = threading.Lock()
def _kb_root() -> str:
root = str(CFG.get('knowledge_base_dir', '/var/lib/mc-langgraph-gateway/knowledge')).strip()
return root or '/var/lib/mc-langgraph-gateway/knowledge'
def _kb_index_path() -> str:
path = str(CFG.get('knowledge_index_file', '')).strip()
if path:
return path
return os.path.join(_kb_root(), 'index.json')
def _kb_tokenize(text: str) -> List[str]:
toks = re.findall(r'[a-z0-9_]{2,}', (text or '').lower())
if not toks:
return []
out: List[str] = []
seen = set()
for t in toks:
if t in seen:
continue
seen.add(t)
out.append(t)
if len(out) >= 300:
break
return out
def _kb_html_to_text(html: str) -> str:
body = re.sub(r'(?is)<script.*?>.*?</script>', ' ', html or '')
body = re.sub(r'(?is)<style.*?>.*?</style>', ' ', body)
body = re.sub(r'(?is)<[^>]+>', ' ', body)
body = re.sub(r'\s+', ' ', body).strip()
return body
def _kb_slug(s: str) -> str:
n = re.sub(r'[^a-zA-Z0-9._-]+', '_', (s or '').strip())
n = n.strip('._-')
return (n[:80] or 'doc').lower()
def _kb_fetch_url(url: str) -> Dict[str, Any]:
max_bytes = int(CFG.get('knowledge_max_doc_bytes', 200000))
r = requests.get(url, timeout=25)
r.raise_for_status()
ct = (r.headers.get('content-type') or '').lower()
raw = r.content[:max_bytes]
if 'html' in ct:
text = _kb_html_to_text(raw.decode(errors='replace'))
else:
text = raw.decode(errors='replace')
title = ''
m = re.search(r'(?is)<title>(.*?)</title>', r.text if 'html' in ct else '')
if m:
title = re.sub(r'\s+', ' ', m.group(1)).strip()
return {'title': title, 'text': text}
def _kb_ingest_url(url: str) -> Dict[str, Any]:
parsed = urlparse(url)
host = (parsed.netloc or '').lower()
if host not in set(str(h).lower() for h in CFG.get('knowledge_allowed_hosts', [
'minecraft.wiki', 'www.minecraft.wiki', 'docs.papermc.io', 'intellectualsites.github.io', 'enginehub.org', 'worldedit.enginehub.org'
])):
return {'ok': False, 'error': f'host not allowed: {host}'}
try:
fetched = _kb_fetch_url(url)
text = (fetched.get('text') or '').strip()
if len(text) < 80:
return {'ok': False, 'error': 'document too short'}
title = fetched.get('title') or os.path.basename(parsed.path) or host
root = Path(_kb_root())
root.mkdir(parents=True, exist_ok=True)
digest = hashlib.sha1(url.encode()).hexdigest()[:12]
fname = f"{_kb_slug(title)}_{digest}.md"
out = root / fname
out.write_text(f"# {title}\n\nSource: {url}\n\n{text}\n", encoding='utf-8')
return {'ok': True, 'path': str(out), 'source': url, 'title': title}
except Exception as e:
return {'ok': False, 'error': str(e)}
def _kb_build_index() -> Dict[str, Any]:
root = Path(_kb_root())
root.mkdir(parents=True, exist_ok=True)
docs = []
for p in root.rglob('*'):
if not p.is_file() or p.suffix.lower() not in _KB_ALLOWED_EXTS:
continue
try:
text = p.read_text(encoding='utf-8', errors='replace')
except Exception:
continue
title = p.name
m = re.search(r'^#\s+(.+)$', text, re.MULTILINE)
if m:
title = m.group(1).strip()[:120]
snippet = re.sub(r'\s+', ' ', text[:800]).strip()
tokens = _kb_tokenize(text)
rel = str(p.relative_to(root))
doc_id = hashlib.sha1(rel.encode()).hexdigest()[:12]
docs.append({
'id': doc_id,
'path': rel,
'title': title,
'snippet': snippet[:260],
'tokens': tokens,
'mtime': p.stat().st_mtime,
})
out = {'generated_at': time.time(), 'docs': docs}
idx = Path(_kb_index_path())
idx.parent.mkdir(parents=True, exist_ok=True)
idx.write_text(json.dumps(out, ensure_ascii=True), encoding='utf-8')
with _kb_lock:
_kb_index_cache['loaded_at'] = time.time()
_kb_index_cache['docs'] = docs
return {'ok': True, 'count': len(docs), 'path': str(idx)}
def _kb_load_index(force: bool = False) -> List[Dict[str, Any]]:
with _kb_lock:
if _kb_index_cache.get('docs') and not force:
return list(_kb_index_cache['docs'])
idx = Path(_kb_index_path())
if not idx.exists():
_kb_build_index()
try:
data = json.loads(idx.read_text(encoding='utf-8'))
except Exception:
_kb_build_index()
data = json.loads(idx.read_text(encoding='utf-8'))
docs = data.get('docs') or []
with _kb_lock:
_kb_index_cache['loaded_at'] = time.time()
_kb_index_cache['docs'] = docs
return docs
def _kb_bootstrap_if_needed() -> None:
if not bool(CFG.get('knowledge_bootstrap_on_start', True)):
return
root = Path(_kb_root())
root.mkdir(parents=True, exist_ok=True)
existing = [p for p in root.rglob('*') if p.is_file() and p.suffix.lower() in _KB_ALLOWED_EXTS]
if existing:
return
urls = CFG.get('knowledge_bootstrap_urls', []) or []
if not urls:
return
ok = 0
for url in urls:
res = _kb_ingest_url(str(url))
if res.get('ok'):
ok += 1
log.info('knowledge bootstrap completed: %d/%d docs ingested', ok, len(urls))
def _kb_search(query: str, limit: int = 5) -> List[Dict[str, Any]]:
docs = _kb_load_index()
q_tokens = set(_kb_tokenize(query))
if not q_tokens:
return []
scored = []
q_lower = query.lower()
for d in docs:
tokens = set(d.get('tokens') or [])
overlap = len(q_tokens.intersection(tokens))
if overlap <= 0:
continue
score = overlap
if q_lower in (d.get('title', '').lower()):
score += 3
if q_lower in (d.get('snippet', '').lower()):
score += 1
scored.append((score, d))
scored.sort(key=lambda x: x[0], reverse=True)
out = []
for _, d in scored[:max(1, limit)]:
out.append({
'doc_id': d.get('id'),
'title': d.get('title'),
'path': d.get('path'),
'snippet': d.get('snippet'),
})
return out
def _kb_read(doc_id: str, query: str = '') -> Dict[str, Any]:
docs = _kb_load_index()
hit = None
for d in docs:
if d.get('id') == doc_id:
hit = d
break
if not hit:
return {'ok': False, 'error': 'doc_id not found', 'results': []}
full = Path(_kb_root()) / str(hit.get('path'))
if not full.exists():
return {'ok': False, 'error': 'file missing', 'results': []}
text = full.read_text(encoding='utf-8', errors='replace')
q = (query or '').strip().lower()
if q and q in text.lower():
idx = text.lower().find(q)
start = max(0, idx - 350)
end = min(len(text), idx + 650)
excerpt = text[start:end]
else:
excerpt = text[:1000]
return {
'ok': True,
'results': [{
'doc_id': doc_id,
'title': hit.get('title'),
'path': hit.get('path'),
'text': re.sub(r'\s+', ' ', excerpt).strip(),
}],
}
def _db_enabled() -> bool:
return bool(CFG.get('session_persistence_enabled', True))
def _db_conn() -> sqlite3.Connection:
return sqlite3.connect(DB_PATH, timeout=10)
def _db_init():
if not _db_enabled():
return
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
with _db_lock:
conn = _db_conn()
try:
conn.execute(
'CREATE TABLE IF NOT EXISTS sessions ('
'session_id TEXT PRIMARY KEY,'
'player TEXT NOT NULL,'
'mode TEXT NOT NULL,'
'created_at REAL NOT NULL,'
'updated_at REAL NOT NULL,'
'messages_json TEXT NOT NULL'
')'
)
conn.execute('CREATE INDEX IF NOT EXISTS idx_sessions_player_mode ON sessions(player, mode)')
conn.execute('CREATE INDEX IF NOT EXISTS idx_sessions_updated_at ON sessions(updated_at)')
conn.commit()
finally:
conn.close()
def _session_to_row(s: SessionState):
return (
s.session_id,
s.player,
s.mode,
s.created_at,
s.updated_at,
json.dumps(s.messages[-60:], ensure_ascii=True),
)
def _row_to_session(row) -> SessionState:
messages: List[Dict[str, str]]
try:
messages = json.loads(row[5])
if not isinstance(messages, list):
messages = []
except Exception:
messages = []
return SessionState(
session_id=row[0],
player=row[1],
mode=row[2],
created_at=float(row[3]),
updated_at=float(row[4]),
messages=messages,
)
def _db_upsert_session(s: SessionState):
if not _db_enabled():
return
with _db_lock:
conn = _db_conn()
try:
conn.execute(
'INSERT INTO sessions (session_id, player, mode, created_at, updated_at, messages_json) '
'VALUES (?, ?, ?, ?, ?, ?) '
'ON CONFLICT(session_id) DO UPDATE SET '
'player=excluded.player, mode=excluded.mode, created_at=excluded.created_at, '
'updated_at=excluded.updated_at, messages_json=excluded.messages_json',
_session_to_row(s),
)
conn.commit()
finally:
conn.close()
def _db_get_by_id(session_id: str) -> Optional[SessionState]:
if not _db_enabled():
return None
with _db_lock:
conn = _db_conn()
try:
cur = conn.execute(
'SELECT session_id, player, mode, created_at, updated_at, messages_json '
'FROM sessions WHERE session_id = ? LIMIT 1',
(session_id,),
)
row = cur.fetchone()
finally:
conn.close()
if not row:
return None
return _row_to_session(row)
def _db_get_by_player_mode(player: str, mode: str) -> Optional[SessionState]:
if not _db_enabled():
return None
with _db_lock:
conn = _db_conn()
try:
cur = conn.execute(
'SELECT session_id, player, mode, created_at, updated_at, messages_json '
'FROM sessions WHERE player = ? AND mode = ? ORDER BY updated_at DESC LIMIT 1',
(player, mode),
)
row = cur.fetchone()
finally:
conn.close()
if not row:
return None
return _row_to_session(row)
def _db_delete(session_id: str):
if not _db_enabled():
return
with _db_lock:
conn = _db_conn()
try:
conn.execute('DELETE FROM sessions WHERE session_id = ?', (session_id,))
conn.commit()
finally:
conn.close()
def _db_cleanup_expired(ttl_seconds: int) -> int:
if not _db_enabled():
return 0
cutoff = time.time() - ttl_seconds
with _db_lock:
conn = _db_conn()
try:
cur = conn.execute('DELETE FROM sessions WHERE updated_at < ?', (cutoff,))
conn.commit()
return int(cur.rowcount or 0)
finally:
conn.close()
def _cleanup_sessions():
ttl = int(CFG.get('session_ttl_seconds', 21600))
now = time.time()
with _sessions_lock:
dead = [sid for sid, s in _sessions.items() if now - s.updated_at > ttl]
for sid in dead:
_sessions.pop(sid, None)
if dead:
log.info('Cleaned %d expired sessions', len(dead))
db_dead = _db_cleanup_expired(ttl)
if db_dead:
log.info('Cleaned %d expired DB sessions', db_dead)
def _session_get(session_id: str) -> SessionState:
_cleanup_sessions()
with _sessions_lock:
s = _sessions.get(session_id)
if not s:
s = _db_get_by_id(session_id)
if s:
with _sessions_lock:
_sessions[session_id] = s
if not s:
raise HTTPException(status_code=404, detail='session not found')
return s
def _session_create(player: str, mode: str) -> SessionState:
_cleanup_sessions()
with _sessions_lock:
for existing in _sessions.values():
if existing.player == player and existing.mode == mode:
existing.updated_at = time.time()
_db_upsert_session(existing)
return existing
persisted = _db_get_by_player_mode(player, mode)
if persisted:
persisted.updated_at = time.time()
with _sessions_lock:
_sessions[persisted.session_id] = persisted
_db_upsert_session(persisted)
return persisted
sid = 'sess_' + uuid.uuid4().hex[:16]
s = SessionState(session_id=sid, player=player, mode=mode)
with _sessions_lock:
_sessions[sid] = s
_db_upsert_session(s)
return s
def _ollama_chat(model: str, messages: List[Dict[str, str]], *, fmt: Optional[str] = None,
temperature: float = 0.7, max_tokens: int = 400, timeout: int = 60) -> str:
payload: Dict[str, Any] = {
'model': model,
'messages': messages,
'stream': False,
'options': {
'temperature': temperature,
'num_predict': max_tokens,
}
}
if fmt:
payload['format'] = fmt
r = requests.post(f"{CFG['ollama_url']}/api/chat", json=payload, timeout=timeout)
r.raise_for_status()
return r.json()['message']['content']
def _parse_json(content: str) -> Dict[str, Any]:
try:
return json.loads(content)
except json.JSONDecodeError:
# salvage commands if partially valid
cmds = []
m = re_search_commands(content)
if m:
cmds = m
return {'commands': cmds, 'message': ''}
def re_search_commands(content: str) -> List[str]:
import re
m = re.search(r'"commands"\s*:\s*\[(.*?)(?:\]|$)', content, re.DOTALL)
if not m:
return []
return re.findall(r'"([^"]+)"', m.group(1))
def tool_web_search(query: str) -> Dict[str, Any]:
try:
r = requests.get('https://api.duckduckgo.com/', params={
'q': query,
'format': 'json',
'no_redirect': 1,
'no_html': 1,
}, timeout=20)
r.raise_for_status()
data = r.json()
out = []
if data.get('AbstractText'):
out.append({'title': 'Abstract', 'text': data['AbstractText']})
for item in (data.get('RelatedTopics') or [])[:3]:
if isinstance(item, dict) and item.get('Text'):
out.append({'title': item.get('FirstURL', ''), 'text': item['Text']})
return {'ok': True, 'results': out}
except Exception as e:
return {'ok': False, 'error': str(e), 'results': []}
def tool_wiki_lookup(query: str) -> Dict[str, Any]:
try:
s = requests.get('https://minecraft.wiki/api.php', params={
'action': 'opensearch',
'format': 'json',
'search': query,
'limit': 3,
}, timeout=20)
s.raise_for_status()
data = s.json()
titles = data[1] if len(data) > 1 else []
if not titles:
return {'ok': True, 'results': []}
results = []
for t in titles:
e = requests.get('https://minecraft.wiki/api.php', params={
'action': 'query',
'format': 'json',
'prop': 'extracts',
'exintro': 1,
'explaintext': 1,
'titles': t,
}, timeout=20)
e.raise_for_status()
pages = e.json().get('query', {}).get('pages', {})
extract = ''
for p in pages.values():
extract = (p.get('extract') or '')[:500]
break
results.append({'title': t, 'text': extract})
return {'ok': True, 'results': results}
except Exception as e:
return {'ok': False, 'error': str(e), 'results': []}
# ---------------------------------------------------------------------------
# World observation tools (RCON-based)
# ---------------------------------------------------------------------------
_ENTITY_TYPES_SCAN = [
'zombie', 'skeleton', 'creeper', 'spider', 'enderman', 'witch',
'phantom', 'drowned', 'husk', 'stray', 'pillager', 'vindicator',
'cow', 'pig', 'sheep', 'chicken', 'horse', 'wolf', 'cat', 'villager',
'iron_golem', 'snow_golem', 'bee', 'fox', 'rabbit', 'squid',
'bat', 'parrot', 'turtle', 'dolphin', 'axolotl', 'goat', 'frog',
'allay', 'sniffer', 'camel', 'armadillo', 'breeze', 'bogged',
'item', 'experience_orb', 'armor_stand', 'minecart', 'boat', 'tnt',
]
def _parse_pos(rcon_output: str) -> Optional[List[float]]:
"""Parse position from 'data get entity <p> Pos' RCON output."""
m = re.search(r'\[(-?[\d.]+)d,\s*(-?[\d.]+)d,\s*(-?[\d.]+)d\]', rcon_output)
if m:
return [float(m.group(1)), float(m.group(2)), float(m.group(3))]
return None
def tool_world_player_info(player: str) -> Dict[str, Any]:
"""Get player position, health, gamemode, and inventory summary."""
rc = _rcon_cfg()
try:
pos_raw = _rcon_query(f'data get entity {player} Pos', **rc)
pos = _parse_pos(pos_raw)
health_raw = _rcon_query(f'data get entity {player} Health', **rc)
health_m = re.search(r'([\d.]+)f', health_raw)
health = float(health_m.group(1)) if health_m else None
gm_raw = _rcon_query(f'data get entity {player} playerGameType', **rc)
gm_m = re.search(r'(\d+)', gm_raw)
gamemode_map = {0: 'survival', 1: 'creative', 2: 'adventure', 3: 'spectator'}
gamemode = gamemode_map.get(int(gm_m.group(1)), 'unknown') if gm_m else None
inv_raw = _rcon_query(f'data get entity {player} Inventory', **rc)
# Count inventory items (each item in the list is an entry)
inv_count = inv_raw.count('{') if 'entity data' in inv_raw.lower() else 0
return {
'ok': True,
'results': [{
'player': player,
'position': {'x': pos[0], 'y': pos[1], 'z': pos[2]} if pos else None,
'health': health,
'max_health': 20.0,
'gamemode': gamemode,
'inventory_items': inv_count,
}],
}
except Exception as e:
return {'ok': False, 'error': str(e), 'results': []}
def tool_world_nearby_entities(player: str, radius: int = 30) -> Dict[str, Any]:
"""Scan for entity types near a player within given radius."""
rc = _rcon_cfg()
radius = min(max(radius, 5), 60) # clamp to 5-60
try:
pos_raw = _rcon_query(f'data get entity {player} Pos', **rc)
pos = _parse_pos(pos_raw)
if not pos:
return {'ok': False, 'error': 'player not found or offline', 'results': []}
x, y, z = int(pos[0]), int(pos[1]), int(pos[2])
found = []
for etype in _ENTITY_TYPES_SCAN:
r = _rcon_query(
f'execute if entity @e[x={x},y={y},z={z},distance=..{radius},type=minecraft:{etype}]',
**rc
)
if 'passed' in r.lower():
count_m = re.search(r'Count:\s*(\d+)', r)
count = int(count_m.group(1)) if count_m else 1
found.append({'type': etype, 'count': count})
return {
'ok': True,
'results': [{
'player': player,
'scan_center': {'x': x, 'y': y, 'z': z},
'radius': radius,
'entities': found,
'total': sum(e['count'] for e in found),
}],
}
except Exception as e:
return {'ok': False, 'error': str(e), 'results': []}
def tool_world_check_block(x: int, y: int, z: int, block_type: str) -> Dict[str, Any]:
"""Check if a specific block type exists at coordinates."""
rc = _rcon_cfg()
try:
if not block_type.startswith('minecraft:'):
block_type = f'minecraft:{block_type}'
r = _rcon_query(f'execute if block {x} {y} {z} {block_type}', **rc)
return {
'ok': True,
'results': [{
'position': {'x': x, 'y': y, 'z': z},
'block_type': block_type,
'matches': 'passed' in r.lower(),
}],
}
except Exception as e:
return {'ok': False, 'error': str(e), 'results': []}
def tool_world_server_state() -> Dict[str, Any]:
"""Get server-level state: players, time, worldborder, difficulty."""
rc = _rcon_cfg()
try:
players_raw = _rcon_query('list', **rc)
time_raw = _rcon_query('time query daytime', **rc)
border_raw = _rcon_query('worldborder get', **rc)
diff_raw = _rcon_query('difficulty', **rc)
# Parse player list
players = []
m = re.search(r'online:\s*(.*)', players_raw)
if m and m.group(1).strip():
players = [p.strip() for p in m.group(1).split(',') if p.strip()]
count_m = re.search(r'(\d+) of a max of (\d+)', players_raw)
count = int(count_m.group(1)) if count_m else len(players)
max_players = int(count_m.group(2)) if count_m else 20
# Parse time
time_m = re.search(r'(\d+)', time_raw)
ticks = int(time_m.group(1)) if time_m else 0
# Convert to approximate in-game time (0=6:00, 6000=noon, 12000=18:00, 18000=midnight)
hours = ((ticks + 6000) % 24000) // 1000
is_night = ticks >= 13000 or ticks < 0
# Parse worldborder
border_m = re.search(r'(\d+)', border_raw)
border = int(border_m.group(1)) if border_m else None
# Parse difficulty
diff_m = re.search(r'difficulty is (\w+)', diff_raw)
difficulty = diff_m.group(1) if diff_m else 'unknown'
return {
'ok': True,
'results': [{
'players_online': players,
'player_count': count,
'max_players': max_players,
'time_ticks': ticks,
'approx_hour': hours,
'is_night': is_night,
'worldborder_width': border,
'difficulty': difficulty,
}],
}
except Exception as e:
return {'ok': False, 'error': str(e), 'results': []}
def tool_local_search(query: str) -> Dict[str, Any]:
try:
rows = _kb_search(query, limit=5)
return {'ok': True, 'results': rows}
except Exception as e:
return {'ok': False, 'error': str(e), 'results': []}
def tool_local_read(doc_id: str, query: str = '') -> Dict[str, Any]:
try:
return _kb_read(doc_id, query)
except Exception as e:
return {'ok': False, 'error': str(e), 'results': []}
def _tool_router(user_text: str, max_steps: int, mode: str, context: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Very small bounded heuristic tool planner."""
text = user_text.lower()
calls: List[Dict[str, Any]] = []
if max_steps <= 0:
return calls
# --- World observation tools (RCON-based) ---
# In sudo mode, get player info for position-aware command generation
world_enabled = bool(CFG.get('world_observation_enabled', True))
if world_enabled and mode == 'sudo':
player = str((context or {}).get('player') or '').strip()
if not player:
# Try to extract player from server_state context
ss = (context or {}).get('server_state') or {}
players = ss.get('online_players') or []
if players:
player = players[0] if len(players) == 1 else ''
if player:
# Always get player info for sudo -- position is critical for build/fill/tp commands
calls.append({'tool': 'world.player_info', 'player': player})
# Scan nearby entities if the request involves mobs, entities, or environmental awareness
if any(k in text for k in [
'mob', 'monster', 'entity', 'creature', 'animal', 'kill', 'clear',
'around', 'nearby', 'surround', 'area', 'here', 'near me',
'spawn', 'summon', 'destroy', 'nuke', 'tnt', 'protect', 'safe',
]):
calls.append({'tool': 'world.nearby_entities', 'player': player, 'radius': 30})
# In god/god_system mode, get server state for contextual awareness
if world_enabled and mode in ('god', 'god_system'):
calls.append({'tool': 'world.server_state'})
# --- Knowledge tools ---
if mode == 'sudo':
q = user_text
req = str((context or {}).get('request') or '').strip()
if req:
q = req
calls.append({'tool': 'local.search', 'query': q})
if any(k in text for k in ['wiki', 'minecraft', 'item id', 'recipe', 'craft']):
calls.append({'tool': 'minecraft.wiki_lookup', 'query': user_text})
if len(calls) < max_steps and any(k in text for k in ['what is', 'how to', 'search', 'lookup']):
calls.append({'tool': 'web.search', 'query': user_text})
return calls[:max_steps]
def _commands_prompt(mode: str) -> str:
allowed = ','.join(
p.strip() for p in COMMAND_PREFIXES_BY_MODE.get(mode, COMMAND_PREFIXES_BY_MODE['god'])
)
if mode == 'sudo':
return (
'You are a Minecraft command translator. Return ONLY JSON: {"commands": ["..."]}.\n'
f'Allowed command prefixes: {allowed}.\n'
'Output must be command strings only, no prose, no markdown, no labels, no leading slash.\n'
'Use TOOL results as your source of truth. Do not invent command syntax not supported by retrieved context.\n'
'Read context.sudo_failures and avoid repeating those exact failing patterns.\n'
'Never use old enchantment NBT {Enchantments:[...]} syntax; use item[enchantments={...}] format.\n'
'For TNT, never append a count to summon; use multiple summon commands instead.\n'
'Keep target scope narrow: if request is about "me/my", do not use @a unless explicitly requested.\n'
'You may output template workflow meta-commands: template search <query>, template pick <n> [name], template build <name>.\n'
'For build/make/create requests, prefer the template workflow instead of raw block-by-block commands.\n'
'If request is ambiguous or unsupported, choose a closest valid in-game workaround and keep scope bounded.\n'
'If still unsafe/unknown, return empty commands.\n'
'WORLD STATE: If world.player_info or world.nearby_entities tool results are present, use the player\'s '
'actual coordinates for fill/setblock/tp commands instead of ~ ~ ~ relative coords when absolute positioning '
'is more reliable. Use nearby entity info to make contextually aware decisions.'
)
if mode == 'god_system':
return (
'You are Minecraft divine system automation. Return ONLY JSON: {"commands": ["..."]}.\n'
f'Allowed command prefixes: {allowed}.\n'
'Output must be command strings only, no prose, no markdown, no labels, no leading slash.\n'
'Use valid 1.21 syntax: effect give <player> ..., and weather is clear/rain/thunder only.\n'
'This mode is for intervention/first-login events. Prefer benevolent or thematic world actions.\n'
'If you include kill commands, keep it to at most one player.'
)
return (
'You are Minecraft God command planner. Return ONLY JSON: {"commands": ["..."]}.\n'
f'Allowed command prefixes: {allowed}.\n'
'Output must be command strings only, no prose, no markdown, no labels, no leading slash.\n'
'Use valid 1.21 syntax: effect give <player> ..., and weather is clear/rain/thunder only.\n'
'Avoid accidental lethal vertical teleports in benevolent responses unless explicitly requested.\n'
'Do not use tp in helpful responses unless user explicitly asks for movement.\n'
'Balance benevolence and judgment based on context.\n'
'Use valid Minecraft command syntax only.'
)
def _message_prompt(mode: str) -> str:
if mode == 'sudo':
return 'Return empty string.'
if mode == 'god_system':
return (
'You are God speaking to all players about a system event. '
'Return plain text only, no JSON.'
)
return (
'You are God in Minecraft. Return a dramatic but clear message as plain text only.'
)
def _sanitize_commands(commands_raw: Any, mode: str) -> List[str]:
allowed_prefixes = COMMAND_PREFIXES_BY_MODE.get(mode, COMMAND_PREFIXES_BY_MODE['god'])
max_commands = int(CFG.get('gateway_max_commands', 8))
cleaned: List[str] = []
if not isinstance(commands_raw, list):
return []
for entry in commands_raw:
if not isinstance(entry, str):
continue
pieces = re.split(r'[\n;]+', entry)
for piece in pieces:
cmd = piece.strip()
if not cmd:
continue
if cmd.startswith('/'):
cmd = cmd[1:].strip()
cmd = cmd.rstrip(' .')
if len(cmd) < 3 or len(cmd) > 240:
continue
if cmd.lower().startswith('commands'):
continue
if not any(cmd.startswith(p) for p in allowed_prefixes):
continue
cleaned.append(cmd)
if len(cleaned) >= max_commands:
break
if len(cleaned) >= max_commands:
break
deduped: List[str] = []
seen = set()
for cmd in cleaned:
if cmd in seen:
continue
seen.add(cmd)
deduped.append(cmd)
return deduped
def run_pipeline(session: SessionState, req: MessageRequest) -> MessageResponse:
session.updated_at = time.time()
# Compose input text
context_json = json.dumps(req.context or {}, ensure_ascii=True)
user_text = req.text.strip()
user_blob = f"message: {user_text}\ncontext: {context_json}"
session.messages.append({'role': req.role, 'content': user_blob})
# Feedback-only messages update session state without running LLM/tools.
if bool((req.context or {}).get('feedback_only', False)):
session.messages.append({
'role': 'assistant',
'content': json.dumps({'message': '', 'commands': []}, ensure_ascii=True)
})
_db_upsert_session(session)
return MessageResponse(message=None, commands=[], tool_trace=[])
_db_upsert_session(session)
tool_trace: List[Dict[str, Any]] = []
tool_results_block = ''
if req.allow_tools:
calls = _tool_router(
user_text,
max(0, min(req.max_tool_steps, 6)),
session.mode,
req.context or {},
)
for c in calls:
tool = c['tool']
q = c.get('query', '')
tool_input_desc = q # default description for trace
if tool == 'web.search':
out = tool_web_search(q)
elif tool == 'minecraft.wiki_lookup':
out = tool_wiki_lookup(q)
elif tool == 'local.search':
out = tool_local_search(q)
elif tool == 'local.read':
out = tool_local_read(str(c.get('doc_id', '')), q)
elif tool == 'world.player_info':
p = str(c.get('player', ''))
tool_input_desc = p
out = tool_world_player_info(p)
elif tool == 'world.nearby_entities':
p = str(c.get('player', ''))
r = int(c.get('radius', 30))
tool_input_desc = f'{p} radius={r}'
out = tool_world_nearby_entities(p, r)
elif tool == 'world.check_block':
bx, by, bz = int(c.get('x', 0)), int(c.get('y', 0)), int(c.get('z', 0))
bt = str(c.get('block_type', ''))
tool_input_desc = f'{bx},{by},{bz} {bt}'
out = tool_world_check_block(bx, by, bz, bt)
elif tool == 'world.server_state':
tool_input_desc = 'server'
out = tool_world_server_state()
else:
out = {'ok': False, 'error': 'unknown tool', 'results': []}
tool_trace.append({'tool': tool, 'input': tool_input_desc, 'ok': out.get('ok', False), 'results_count': len(out.get('results', []))})
tool_results_block += f"\nTOOL {tool} query={tool_input_desc}\nRESULT={json.dumps(out, ensure_ascii=True)[:3000]}\n"
# localized retrieval hop: after index search, fetch one top document excerpt
if tool == 'local.search' and out.get('ok') and out.get('results') and len(tool_trace) < max(0, min(req.max_tool_steps, 6)):
top = out['results'][0]
doc_id = str(top.get('doc_id', ''))
if doc_id:
read_out = tool_local_read(doc_id, q)
tool_trace.append({'tool': 'local.read', 'input': doc_id, 'ok': read_out.get('ok', False), 'results_count': len(read_out.get('results', []))})
tool_results_block += f"\nTOOL local.read doc_id={doc_id}\nRESULT={json.dumps(read_out, ensure_ascii=True)[:3000]}\n"
# Single-call mode: one LLM call returns both commands and message
if CFG.get('single_call', False):
combined_prompt = _commands_prompt(session.mode)
if session.mode != 'sudo':
combined_prompt += '\n\nAlso include a "message" field with a dramatic in-character response.'
sc_messages = [
{'role': 'system', 'content': combined_prompt},
*session.messages[-12:],
{'role': 'user', 'content': user_blob + tool_results_block},
]
sc_raw = _ollama_chat(
CFG.get('command_model', CFG.get('model', 'mortdecai-v4')),
sc_messages,
fmt='json',
temperature=0.3,
max_tokens=600,
)
sc_parsed = _parse_json(sc_raw)
commands = _sanitize_commands(sc_parsed.get('commands') or [], session.mode)
message = sc_parsed.get('message') or None
else:
# Two-call mode: separate command and message calls
cmd_messages = [
{'role': 'system', 'content': _commands_prompt(session.mode)},
*session.messages[-12:],
{'role': 'user', 'content': user_blob + tool_results_block},
]
cmd_raw = _ollama_chat(
CFG.get('command_model', 'qwen3-coder:30b'),
cmd_messages,
fmt='json',
temperature=0.2,
max_tokens=220,
)
cmd_parsed = _parse_json(cmd_raw)
commands = _sanitize_commands(cmd_parsed.get('commands') or [], session.mode)
# Message call (not for sudo)
message = None
if session.mode != 'sudo':
msg_messages = [
{'role': 'system', 'content': _message_prompt(session.mode)},
*session.messages[-12:],
{'role': 'user', 'content': user_blob + f"\nChosen commands: {commands}" + tool_results_block},
]
message = _ollama_chat(
CFG.get('message_model', 'gemma3:12b'),
msg_messages,
fmt=None,
temperature=0.8,
max_tokens=500,
).strip()
# Save assistant summary back to session memory
session.messages.append({
'role': 'assistant',
'content': json.dumps({'message': message, 'commands': commands}, ensure_ascii=True)
})
_db_upsert_session(session)
return MessageResponse(message=message, commands=commands, tool_trace=tool_trace)
app = FastAPI(title='Minecraft LangGraph Gateway', version='0.1.0')
@app.get('/healthz')
def healthz():
return {'ok': True, 'sessions': len(_sessions)}
@app.post('/v1/session/start', response_model=StartSessionResponse)
def start_session(req: StartSessionRequest):
s = _session_create(req.player, req.mode)
log.info('session start player=%s mode=%s session=%s', req.player, req.mode, s.session_id)
return StartSessionResponse(session_id=s.session_id)
@app.post('/v1/session/{session_id}/message', response_model=MessageResponse)
def send_message(session_id: str, req: MessageRequest):
session = _session_get(session_id)
out = run_pipeline(session, req)
return out
@app.post('/v1/session/{session_id}/close')
def close_session(session_id: str):
with _sessions_lock:
existed = session_id in _sessions
_sessions.pop(session_id, None)
if not existed and _db_enabled():
existed = _db_get_by_id(session_id) is not None
_db_delete(session_id)
return {'closed': existed}
try:
_kb_bootstrap_if_needed()
if bool(CFG.get('knowledge_auto_index_on_start', True)):
meta = _kb_build_index()
log.info('knowledge index ready: %s docs=%s', meta.get('path'), meta.get('count'))
except Exception as e:
log.warning('knowledge bootstrap/index failed: %s', e)
_db_init()