#!/usr/bin/env python3 """ langgraph_gateway.py Session-based LLM gateway sidecar for Minecraft AI. Provides: - per-player sessions - bounded tool loop (web.search, minecraft.wiki_lookup) - final {message, commands, tool_trace} payload This is intentionally lightweight and API-first. Execution safety remains in mc_aigod_paper.py. """ import json import logging import os import re import sqlite3 import threading import time import uuid from dataclasses import dataclass, field from typing import Any, Dict, List, Optional import requests from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field logging.basicConfig( level=logging.INFO, format='%(asctime)s [gateway] %(levelname)s: %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('/var/log/mc_langgraph_gateway.log'), ] ) log = logging.getLogger(__name__) CONFIG_PATH = '/etc/mc_langgraph_gateway.json' class StartSessionRequest(BaseModel): player: str mode: str = Field(pattern='^(god|sudo|god_system)$') class StartSessionResponse(BaseModel): session_id: str class MessageRequest(BaseModel): role: str = Field(default='user') text: str context: Dict[str, Any] = Field(default_factory=dict) allow_tools: bool = True max_tool_steps: int = 4 class MessageResponse(BaseModel): message: Optional[str] = None commands: List[str] = Field(default_factory=list) tool_trace: List[Dict[str, Any]] = Field(default_factory=list) @dataclass class SessionState: session_id: str player: str mode: str created_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time) messages: List[Dict[str, str]] = field(default_factory=list) _sessions: Dict[str, SessionState] = {} _sessions_lock = threading.Lock() COMMAND_PREFIXES_BY_MODE = { 'sudo': [ 'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ', 'execute ', 'kill ', 'summon ', 'tellraw ', 'worldborder ', 'fill ', 'setblock ', 'clone ', ], 'god': [ 'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ', 'execute ', 'kill ', 'summon ', 'tellraw ', 'worldborder ', ], 'god_system': [ 'give ', 'effect ', 'xp ', 'tp ', 'time ', 'weather ', 'execute ', 'kill ', 'summon ', 'tellraw ', 'worldborder ', ], } def load_config() -> Dict[str, Any]: try: with open(CONFIG_PATH) as f: return json.load(f) except FileNotFoundError: log.warning('Config not found, using defaults') return { 'ollama_url': 'http://127.0.0.1:11434', 'message_model': 'gemma3:12b', 'command_model': 'qwen3-coder:30b', 'tool_model': 'qwen2.5:1.5b', 'session_ttl_seconds': 21600, } CFG = load_config() DB_PATH = CFG.get('session_db_path', '/var/lib/mc-langgraph-gateway/sessions.db') _db_lock = threading.Lock() def _db_enabled() -> bool: return bool(CFG.get('session_persistence_enabled', True)) def _db_conn() -> sqlite3.Connection: return sqlite3.connect(DB_PATH, timeout=10) def _db_init(): if not _db_enabled(): return os.makedirs(os.path.dirname(DB_PATH), exist_ok=True) with _db_lock: conn = _db_conn() try: conn.execute( 'CREATE TABLE IF NOT EXISTS sessions (' 'session_id TEXT PRIMARY KEY,' 'player TEXT NOT NULL,' 'mode TEXT NOT NULL,' 'created_at REAL NOT NULL,' 'updated_at REAL NOT NULL,' 'messages_json TEXT NOT NULL' ')' ) conn.execute('CREATE INDEX IF NOT EXISTS idx_sessions_player_mode ON sessions(player, mode)') conn.execute('CREATE INDEX IF NOT EXISTS idx_sessions_updated_at ON sessions(updated_at)') conn.commit() finally: conn.close() def _session_to_row(s: SessionState): return ( s.session_id, s.player, s.mode, s.created_at, s.updated_at, json.dumps(s.messages[-60:], ensure_ascii=True), ) def _row_to_session(row) -> SessionState: messages: List[Dict[str, str]] try: messages = json.loads(row[5]) if not isinstance(messages, list): messages = [] except Exception: messages = [] return SessionState( session_id=row[0], player=row[1], mode=row[2], created_at=float(row[3]), updated_at=float(row[4]), messages=messages, ) def _db_upsert_session(s: SessionState): if not _db_enabled(): return with _db_lock: conn = _db_conn() try: conn.execute( 'INSERT INTO sessions (session_id, player, mode, created_at, updated_at, messages_json) ' 'VALUES (?, ?, ?, ?, ?, ?) ' 'ON CONFLICT(session_id) DO UPDATE SET ' 'player=excluded.player, mode=excluded.mode, created_at=excluded.created_at, ' 'updated_at=excluded.updated_at, messages_json=excluded.messages_json', _session_to_row(s), ) conn.commit() finally: conn.close() def _db_get_by_id(session_id: str) -> Optional[SessionState]: if not _db_enabled(): return None with _db_lock: conn = _db_conn() try: cur = conn.execute( 'SELECT session_id, player, mode, created_at, updated_at, messages_json ' 'FROM sessions WHERE session_id = ? LIMIT 1', (session_id,), ) row = cur.fetchone() finally: conn.close() if not row: return None return _row_to_session(row) def _db_get_by_player_mode(player: str, mode: str) -> Optional[SessionState]: if not _db_enabled(): return None with _db_lock: conn = _db_conn() try: cur = conn.execute( 'SELECT session_id, player, mode, created_at, updated_at, messages_json ' 'FROM sessions WHERE player = ? AND mode = ? ORDER BY updated_at DESC LIMIT 1', (player, mode), ) row = cur.fetchone() finally: conn.close() if not row: return None return _row_to_session(row) def _db_delete(session_id: str): if not _db_enabled(): return with _db_lock: conn = _db_conn() try: conn.execute('DELETE FROM sessions WHERE session_id = ?', (session_id,)) conn.commit() finally: conn.close() def _db_cleanup_expired(ttl_seconds: int) -> int: if not _db_enabled(): return 0 cutoff = time.time() - ttl_seconds with _db_lock: conn = _db_conn() try: cur = conn.execute('DELETE FROM sessions WHERE updated_at < ?', (cutoff,)) conn.commit() return int(cur.rowcount or 0) finally: conn.close() def _cleanup_sessions(): ttl = int(CFG.get('session_ttl_seconds', 21600)) now = time.time() with _sessions_lock: dead = [sid for sid, s in _sessions.items() if now - s.updated_at > ttl] for sid in dead: _sessions.pop(sid, None) if dead: log.info('Cleaned %d expired sessions', len(dead)) db_dead = _db_cleanup_expired(ttl) if db_dead: log.info('Cleaned %d expired DB sessions', db_dead) def _session_get(session_id: str) -> SessionState: _cleanup_sessions() with _sessions_lock: s = _sessions.get(session_id) if not s: s = _db_get_by_id(session_id) if s: with _sessions_lock: _sessions[session_id] = s if not s: raise HTTPException(status_code=404, detail='session not found') return s def _session_create(player: str, mode: str) -> SessionState: _cleanup_sessions() with _sessions_lock: for existing in _sessions.values(): if existing.player == player and existing.mode == mode: existing.updated_at = time.time() _db_upsert_session(existing) return existing persisted = _db_get_by_player_mode(player, mode) if persisted: persisted.updated_at = time.time() with _sessions_lock: _sessions[persisted.session_id] = persisted _db_upsert_session(persisted) return persisted sid = 'sess_' + uuid.uuid4().hex[:16] s = SessionState(session_id=sid, player=player, mode=mode) with _sessions_lock: _sessions[sid] = s _db_upsert_session(s) return s def _ollama_chat(model: str, messages: List[Dict[str, str]], *, fmt: Optional[str] = None, temperature: float = 0.7, max_tokens: int = 400, timeout: int = 60) -> str: payload: Dict[str, Any] = { 'model': model, 'messages': messages, 'stream': False, 'options': { 'temperature': temperature, 'num_predict': max_tokens, } } if fmt: payload['format'] = fmt r = requests.post(f"{CFG['ollama_url']}/api/chat", json=payload, timeout=timeout) r.raise_for_status() return r.json()['message']['content'] def _parse_json(content: str) -> Dict[str, Any]: try: return json.loads(content) except json.JSONDecodeError: # salvage commands if partially valid cmds = [] m = re_search_commands(content) if m: cmds = m return {'commands': cmds, 'message': ''} def re_search_commands(content: str) -> List[str]: import re m = re.search(r'"commands"\s*:\s*\[(.*?)(?:\]|$)', content, re.DOTALL) if not m: return [] return re.findall(r'"([^"]+)"', m.group(1)) def tool_web_search(query: str) -> Dict[str, Any]: try: r = requests.get('https://api.duckduckgo.com/', params={ 'q': query, 'format': 'json', 'no_redirect': 1, 'no_html': 1, }, timeout=20) r.raise_for_status() data = r.json() out = [] if data.get('AbstractText'): out.append({'title': 'Abstract', 'text': data['AbstractText']}) for item in (data.get('RelatedTopics') or [])[:3]: if isinstance(item, dict) and item.get('Text'): out.append({'title': item.get('FirstURL', ''), 'text': item['Text']}) return {'ok': True, 'results': out} except Exception as e: return {'ok': False, 'error': str(e), 'results': []} def tool_wiki_lookup(query: str) -> Dict[str, Any]: try: s = requests.get('https://minecraft.wiki/api.php', params={ 'action': 'opensearch', 'format': 'json', 'search': query, 'limit': 3, }, timeout=20) s.raise_for_status() data = s.json() titles = data[1] if len(data) > 1 else [] if not titles: return {'ok': True, 'results': []} results = [] for t in titles: e = requests.get('https://minecraft.wiki/api.php', params={ 'action': 'query', 'format': 'json', 'prop': 'extracts', 'exintro': 1, 'explaintext': 1, 'titles': t, }, timeout=20) e.raise_for_status() pages = e.json().get('query', {}).get('pages', {}) extract = '' for p in pages.values(): extract = (p.get('extract') or '')[:500] break results.append({'title': t, 'text': extract}) return {'ok': True, 'results': results} except Exception as e: return {'ok': False, 'error': str(e), 'results': []} def _tool_router(user_text: str, max_steps: int) -> List[Dict[str, Any]]: """Very small bounded heuristic tool planner.""" text = user_text.lower() calls: List[Dict[str, Any]] = [] if max_steps <= 0: return calls if any(k in text for k in ['wiki', 'minecraft', 'item id', 'recipe', 'craft']): calls.append({'tool': 'minecraft.wiki_lookup', 'query': user_text}) if len(calls) < max_steps and any(k in text for k in ['what is', 'how to', 'search', 'lookup']): calls.append({'tool': 'web.search', 'query': user_text}) return calls[:max_steps] def _commands_prompt(mode: str) -> str: allowed = ','.join( p.strip() for p in COMMAND_PREFIXES_BY_MODE.get(mode, COMMAND_PREFIXES_BY_MODE['god']) ) if mode == 'sudo': return ( 'You are a Minecraft command translator. Return ONLY JSON: {"commands": ["..."]}.\n' f'Allowed command prefixes: {allowed}.\n' 'Output must be command strings only, no prose, no markdown, no labels, no leading slash.\n' 'If unsafe/unknown, return empty commands.' ) if mode == 'god_system': return ( 'You are Minecraft divine system automation. Return ONLY JSON: {"commands": ["..."]}.\n' f'Allowed command prefixes: {allowed}.\n' 'Output must be command strings only, no prose, no markdown, no labels, no leading slash.\n' 'This mode is for intervention/first-login events. Prefer benevolent or thematic world actions.\n' 'If you include kill commands, keep it to at most one player.' ) return ( 'You are Minecraft God command planner. Return ONLY JSON: {"commands": ["..."]}.\n' f'Allowed command prefixes: {allowed}.\n' 'Output must be command strings only, no prose, no markdown, no labels, no leading slash.\n' 'Balance benevolence and judgment based on context.\n' 'Use valid Minecraft command syntax only.' ) def _message_prompt(mode: str) -> str: if mode == 'sudo': return 'Return empty string.' if mode == 'god_system': return ( 'You are God speaking to all players about a system event. ' 'Return plain text only, no JSON.' ) return ( 'You are God in Minecraft. Return a dramatic but clear message as plain text only.' ) def _sanitize_commands(commands_raw: Any, mode: str) -> List[str]: allowed_prefixes = COMMAND_PREFIXES_BY_MODE.get(mode, COMMAND_PREFIXES_BY_MODE['god']) max_commands = int(CFG.get('gateway_max_commands', 8)) cleaned: List[str] = [] if not isinstance(commands_raw, list): return [] for entry in commands_raw: if not isinstance(entry, str): continue pieces = re.split(r'[\n;]+', entry) for piece in pieces: cmd = piece.strip() if not cmd: continue if cmd.startswith('/'): cmd = cmd[1:].strip() cmd = cmd.rstrip(' .') if len(cmd) < 3 or len(cmd) > 240: continue if cmd.lower().startswith('commands'): continue if not any(cmd.startswith(p) for p in allowed_prefixes): continue cleaned.append(cmd) if len(cleaned) >= max_commands: break if len(cleaned) >= max_commands: break deduped: List[str] = [] seen = set() for cmd in cleaned: if cmd in seen: continue seen.add(cmd) deduped.append(cmd) return deduped def run_pipeline(session: SessionState, req: MessageRequest) -> MessageResponse: session.updated_at = time.time() # Compose input text context_json = json.dumps(req.context or {}, ensure_ascii=True) user_text = req.text.strip() user_blob = f"message: {user_text}\ncontext: {context_json}" session.messages.append({'role': req.role, 'content': user_blob}) _db_upsert_session(session) tool_trace: List[Dict[str, Any]] = [] tool_results_block = '' if req.allow_tools: calls = _tool_router(user_text, max(0, min(req.max_tool_steps, 6))) for c in calls: tool = c['tool'] q = c['query'] if tool == 'web.search': out = tool_web_search(q) elif tool == 'minecraft.wiki_lookup': out = tool_wiki_lookup(q) else: out = {'ok': False, 'error': 'unknown tool', 'results': []} tool_trace.append({'tool': tool, 'input': q, 'ok': out.get('ok', False), 'results_count': len(out.get('results', []))}) tool_results_block += f"\nTOOL {tool} query={q}\nRESULT={json.dumps(out, ensure_ascii=True)[:3000]}\n" # Commands call cmd_messages = [ {'role': 'system', 'content': _commands_prompt(session.mode)}, *session.messages[-12:], {'role': 'user', 'content': user_blob + tool_results_block}, ] cmd_raw = _ollama_chat( CFG.get('command_model', 'qwen3-coder:30b'), cmd_messages, fmt='json', temperature=0.2, max_tokens=220, ) cmd_parsed = _parse_json(cmd_raw) commands = _sanitize_commands(cmd_parsed.get('commands') or [], session.mode) # Message call (not for sudo) message = None if session.mode != 'sudo': msg_messages = [ {'role': 'system', 'content': _message_prompt(session.mode)}, *session.messages[-12:], {'role': 'user', 'content': user_blob + f"\nChosen commands: {commands}" + tool_results_block}, ] message = _ollama_chat( CFG.get('message_model', 'gemma3:12b'), msg_messages, fmt=None, temperature=0.8, max_tokens=500, ).strip() # Save assistant summary back to session memory session.messages.append({ 'role': 'assistant', 'content': json.dumps({'message': message, 'commands': commands}, ensure_ascii=True) }) _db_upsert_session(session) return MessageResponse(message=message, commands=commands, tool_trace=tool_trace) app = FastAPI(title='Minecraft LangGraph Gateway', version='0.1.0') @app.get('/healthz') def healthz(): return {'ok': True, 'sessions': len(_sessions)} @app.post('/v1/session/start', response_model=StartSessionResponse) def start_session(req: StartSessionRequest): s = _session_create(req.player, req.mode) log.info('session start player=%s mode=%s session=%s', req.player, req.mode, s.session_id) return StartSessionResponse(session_id=s.session_id) @app.post('/v1/session/{session_id}/message', response_model=MessageResponse) def send_message(session_id: str, req: MessageRequest): session = _session_get(session_id) out = run_pipeline(session, req) return out @app.post('/v1/session/{session_id}/close') def close_session(session_id: str): with _sessions_lock: existed = session_id in _sessions _sessions.pop(session_id, None) if not existed and _db_enabled(): existed = _db_get_by_id(session_id) is not None _db_delete(session_id) return {'closed': existed} _db_init()