""" Mortdecai Gateway MCP Server. Wraps the gateway HTTP API as MCP tools so Claude can operate Mortdecai natively. All game operations go through the gateway — this server never touches Minecraft directly. Tool groups: - Gateway lifecycle (start, stop, restart, status, health) - Player commands (gateway_command) - Brain management (hot-swap providers) - Session management - Bot playtesting (run profiles against the gateway) - Diagnostics (read interactions, analyze errors) - Escalation (write notes for architect sessions) - Logs """ import json import subprocess import time as _time from datetime import datetime from pathlib import Path import httpx import yaml from mcp.server.fastmcp import FastMCP GATEWAY_URL = "http://localhost:8500" CLI_DIR = Path(__file__).parent.parent SCRIPTS_DIR = CLI_DIR / "scripts" CONFIG_DIR = CLI_DIR / "config" DATA_DIR = CLI_DIR / "data" ESCALATION_DIR = DATA_DIR / "escalations" PLAYTEST_DIR = DATA_DIR / "playtests" INTERACTION_DIR = Path.home() / "bin" / "Mortdecai-2.0" / "data" / "interactions" # Ensure data dirs exist for d in [DATA_DIR, ESCALATION_DIR, PLAYTEST_DIR]: d.mkdir(parents=True, exist_ok=True) mcp = FastMCP("mortdecai-gateway") async def _get(path: str) -> dict: async with httpx.AsyncClient(timeout=10) as client: resp = await client.get(f"{GATEWAY_URL}{path}") resp.raise_for_status() return resp.json() async def _post(path: str, body: dict | None = None) -> dict: async with httpx.AsyncClient(timeout=90) as client: resp = await client.post(f"{GATEWAY_URL}{path}", json=body or {}) resp.raise_for_status() return resp.json() async def _patch(path: str, body: dict | None = None) -> dict: async with httpx.AsyncClient(timeout=10) as client: resp = await client.patch(f"{GATEWAY_URL}{path}", json=body or {}) resp.raise_for_status() return resp.json() # --- Gateway lifecycle --- @mcp.tool() def gateway_start() -> str: """Start the Mortdecai gateway if not running.""" result = subprocess.run( ["bash", str(SCRIPTS_DIR / "start-gateway.sh")], capture_output=True, text=True, timeout=30, ) return result.stdout + result.stderr @mcp.tool() def gateway_stop() -> str: """Stop the Mortdecai gateway.""" result = subprocess.run( ["bash", str(SCRIPTS_DIR / "stop-gateway.sh")], capture_output=True, text=True, timeout=10, ) return result.stdout + result.stderr @mcp.tool() def gateway_restart() -> str: """Restart the Mortdecai gateway (stop + start).""" stop = subprocess.run( ["bash", str(SCRIPTS_DIR / "stop-gateway.sh")], capture_output=True, text=True, timeout=10, ) _time.sleep(2) start = subprocess.run( ["bash", str(SCRIPTS_DIR / "start-gateway.sh")], capture_output=True, text=True, timeout=30, ) return stop.stdout + start.stdout + start.stderr @mcp.tool() async def gateway_status() -> str: """Get full gateway status: providers, sessions, oracle state.""" try: data = await _get("/v2/status") return json.dumps(data, indent=2) except httpx.ConnectError: return "Gateway is DOWN — not reachable on port 8500" except Exception as e: return f"Error: {e}" @mcp.tool() async def gateway_health() -> str: """Quick health check — is the gateway alive?""" try: data = await _get("/v2/health") return json.dumps(data, indent=2) except httpx.ConnectError: return "DOWN" except Exception as e: return f"Error: {e}" # --- Player commands (through gateway) --- @mcp.tool() async def gateway_command( player: str, text: str, mode: str = "sudo", server: str = "dev", ) -> str: """Send a command through the gateway as if a player typed it in-game. Args: player: Minecraft player name text: The command text (e.g. "give me a diamond") mode: Command mode — sudo, pray, ask, or raw server: Server target — dev or prod """ try: data = await _post("/v2/quick", { "player": player, "text": text, "server": server, "command_type": mode, }) return json.dumps(data, indent=2) except Exception as e: return f"Error: {e}" # --- Brain management (hot-swap providers) --- @mcp.tool() async def gateway_brain_set( role: str, provider: str, model: str, ) -> str: """Hot-swap the AI provider and model for a gateway role. Args: role: Agent role — eye, hand, voice, opus, architect, orchestrator provider: Provider name — anthropic, codex, openai, ollama, regex model: Model identifier (e.g. "gpt-5.1-codex-mini", "claude-opus-4-20250514") """ try: data = await _patch(f"/v2/brain/{role}", { "provider": provider, "model": model, }) return json.dumps(data, indent=2) except Exception as e: return f"Error: {e}" @mcp.tool() async def gateway_brain_save(role: str) -> str: """Persist a brain's current live override to agents.yaml on disk. Args: role: Agent role to save """ try: data = await _post(f"/v2/brain/{role}/save") return json.dumps(data, indent=2) except Exception as e: return f"Error: {e}" @mcp.tool() async def gateway_brain_reload(role: str) -> str: """Clear in-memory override, reload brain config from agents.yaml. Args: role: Agent role to reload """ try: data = await _post(f"/v2/brain/{role}/reload") return json.dumps(data, indent=2) except Exception as e: return f"Error: {e}" # --- Session management --- @mcp.tool() async def gateway_sessions_clear(player: str, mode: str = "") -> str: """Clear sessions for a player. Optionally filter by mode. Args: player: Player name mode: Optional mode filter (sudo, pray, ask, raw). Empty = all modes. """ try: url = f"/v2/sessions/clear/{player}" if mode: url += f"?mode={mode}" data = await _post(url) return json.dumps(data, indent=2) except Exception as e: return f"Error: {e}" @mcp.tool() async def gateway_sessions_reset() -> str: """Clear ALL sessions for ALL players. Use with caution.""" try: data = await _post("/v2/sessions/reset") return json.dumps(data, indent=2) except Exception as e: return f"Error: {e}" # --- Bot Playtesting --- @mcp.tool() def list_bot_profiles() -> str: """List available bot profiles for playtesting.""" profiles_path = CONFIG_DIR / "bot-profiles.yaml" if not profiles_path.exists(): return "No bot profiles found at config/bot-profiles.yaml" with open(profiles_path) as f: data = yaml.safe_load(f) profiles = data.get("profiles", {}) lines = [] for name, profile in profiles.items(): cmd_count = len(profile.get("commands", [])) lines.append(f" {name}: {profile.get('description', '')} ({cmd_count} commands)") return f"Available profiles ({len(profiles)}):\n" + "\n".join(lines) @mcp.tool() async def run_playtest(profile: str, server: str = "dev") -> str: """Run a bot profile's commands through the gateway and collect results. Sends each command sequentially, records status/response/tools for each. Results are saved to data/playtests/ for later analysis. Args: profile: Bot profile name (e.g. "noob", "griefer", "builder") server: Server target — dev or prod """ profiles_path = CONFIG_DIR / "bot-profiles.yaml" if not profiles_path.exists(): return "No bot profiles config found" with open(profiles_path) as f: data = yaml.safe_load(f) profiles = data.get("profiles", {}) if profile not in profiles: return f"Unknown profile: {profile}. Available: {list(profiles.keys())}" bot = profiles[profile] player = bot.get("player_name", f"Test{profile.title()}") commands = bot.get("commands", []) results = [] for cmd in commands: mode = cmd.get("mode", "sudo") text = cmd.get("text", "") try: resp = await _post("/v2/quick", { "player": player, "text": text, "server": server, "command_type": mode, }) results.append({ "mode": mode, "text": text, "status": resp.get("status", "unknown"), "response": (resp.get("response_text") or "")[:200], "tools_used": [t.get("tool") for t in resp.get("tool_trace", [])], "commands_executed": resp.get("commands_executed", []), "error": None, }) except Exception as e: results.append({ "mode": mode, "text": text, "status": "error", "response": "", "tools_used": [], "commands_executed": [], "error": str(e), }) # Brief pause between commands to avoid overwhelming _time.sleep(1) # Summarize total = len(results) passed = sum(1 for r in results if r["status"] == "completed" and not r["error"]) failed = sum(1 for r in results if r["status"] != "completed" or r["error"]) no_tools = sum(1 for r in results if r["status"] == "completed" and not r["tools_used"]) report = { "profile": profile, "player": player, "timestamp": datetime.now().isoformat(), "summary": { "total": total, "passed": passed, "failed": failed, "no_tools_used": no_tools, }, "results": results, } # Save report filename = f"{datetime.now().strftime('%Y%m%d-%H%M')}-{profile}.json" report_path = PLAYTEST_DIR / filename report_path.write_text(json.dumps(report, indent=2)) # Return summary summary_lines = [f"Playtest: {profile} ({player}) — {passed}/{total} passed, {failed} failed, {no_tools} no-tool-use"] for r in results: status_icon = "OK" if r["status"] == "completed" and not r["error"] else "FAIL" tool_str = ",".join(r["tools_used"]) if r["tools_used"] else "NO_TOOLS" summary_lines.append(f" [{status_icon}] /{r['mode']} {r['text'][:50]} → {tool_str}") if r["error"]: summary_lines.append(f" error: {r['error'][:100]}") summary_lines.append(f"\nReport saved: {report_path}") return "\n".join(summary_lines) @mcp.tool() def list_playtest_reports(limit: int = 10) -> str: """List recent playtest reports. Args: limit: Max number of reports to show (default 10) """ reports = sorted(PLAYTEST_DIR.glob("*.json"), reverse=True)[:limit] if not reports: return "No playtest reports found" lines = [] for r in reports: try: data = json.loads(r.read_text()) s = data.get("summary", {}) lines.append(f" {r.name}: {data.get('profile')} — {s.get('passed',0)}/{s.get('total',0)} passed") except Exception: lines.append(f" {r.name}: (unreadable)") return f"Recent reports ({len(reports)}):\n" + "\n".join(lines) # --- Diagnostics --- @mcp.tool() def read_interactions(date: str = "", limit: int = 20) -> str: """Read recent gateway interaction logs for analysis. Args: date: Date string YYYY-MM-DD (default: today) limit: Max interactions to return (default 20) """ if not date: date = datetime.now().strftime("%Y-%m-%d") log_path = INTERACTION_DIR / f"{date}.jsonl" if not log_path.exists(): return f"No interaction log for {date}" lines = log_path.read_text().strip().split("\n") recent = lines[-limit:] results = [] for line in recent: try: d = json.loads(line) tools = [t.get("tool") for t in d.get("tool_trace", [])] results.append({ "player": d.get("player"), "mode": d.get("mode"), "message": (d.get("message") or "")[:80], "status": d.get("status"), "tools": tools, "has_commands": bool(d.get("commands_executed")), "response_preview": (d.get("response_text") or "")[:100], }) except Exception: continue return json.dumps(results, indent=2) @mcp.tool() def analyze_errors(date: str = "", hours: int = 4) -> str: """Analyze recent gateway logs and interactions for error patterns. Checks for: repeated errors, tool-use failures, timeouts, empty responses, session poisoning (text-only responses with no tool calls). Args: date: Date string YYYY-MM-DD (default: today) hours: How many hours back to analyze (default 4) """ issues = [] # Check gateway log for errors log_path = Path("/tmp/mortdecai-gateway.log") if log_path.exists(): try: log_text = log_path.read_text() error_lines = [l for l in log_text.split("\n") if "ERROR" in l or "Traceback" in l] if error_lines: issues.append({ "type": "gateway_errors", "count": len(error_lines), "recent": error_lines[-3:], }) except Exception: pass # Check interaction logs if not date: date = datetime.now().strftime("%Y-%m-%d") interaction_path = INTERACTION_DIR / f"{date}.jsonl" if interaction_path.exists(): cutoff = _time.time() - (hours * 3600) interactions = [] for line in interaction_path.read_text().strip().split("\n"): try: d = json.loads(line) if d.get("timestamp", 0) > cutoff: interactions.append(d) except Exception: continue # Check for text-only responses (no tool calls) no_tools = [i for i in interactions if not i.get("tool_trace") and i.get("status") == "completed"] if no_tools: issues.append({ "type": "no_tool_use", "count": len(no_tools), "description": "Completed responses with no tool calls (model responded with text only)", "examples": [{"player": i.get("player"), "mode": i.get("mode"), "msg": i.get("message", "")[:60]} for i in no_tools[:3]], }) # Check for errors/timeouts errors = [i for i in interactions if i.get("status") in ("error", "timeout")] if errors: issues.append({ "type": "request_failures", "count": len(errors), "examples": [{"player": i.get("player"), "mode": i.get("mode"), "status": i.get("status"), "msg": i.get("message", "")[:60]} for i in errors[:3]], }) # Check for empty responses empty = [i for i in interactions if not i.get("response_text") and i.get("status") == "completed"] if empty: issues.append({ "type": "empty_responses", "count": len(empty), "examples": [{"player": i.get("player"), "mode": i.get("mode"), "msg": i.get("message", "")[:60]} for i in empty[:3]], }) if not issues: return f"No issues found in the last {hours} hours." return json.dumps({"issues_found": len(issues), "issues": issues}, indent=2) # --- Escalation --- @mcp.tool() def write_escalation( title: str, severity: str, description: str, evidence: str = "", suggested_fix: str = "", ) -> str: """Write an escalation note for the architect session (Seth + Claude). Use this when you find an issue you cannot or should not fix yourself. Args: title: Short title for the issue severity: low, medium, high, critical description: What's wrong and how you discovered it evidence: Log lines, interaction data, or other evidence suggested_fix: Your recommendation for how to fix it (optional) """ note = { "title": title, "severity": severity, "description": description, "evidence": evidence, "suggested_fix": suggested_fix, "timestamp": datetime.now().isoformat(), "status": "open", } filename = f"{datetime.now().strftime('%Y%m%d-%H%M')}-{title[:40].replace(' ', '-').lower()}.json" path = ESCALATION_DIR / filename path.write_text(json.dumps(note, indent=2)) return f"Escalation written: {path}" @mcp.tool() def list_escalations(status: str = "open") -> str: """List escalation notes, optionally filtered by status. Args: status: Filter by status — open, resolved, all (default: open) """ files = sorted(ESCALATION_DIR.glob("*.json"), reverse=True) if not files: return "No escalations found" notes = [] for f in files: try: data = json.loads(f.read_text()) if status != "all" and data.get("status") != status: continue notes.append(f" [{data.get('severity','?').upper()}] {data.get('title','')} ({f.name})") except Exception: continue if not notes: return f"No {status} escalations" return f"Escalations ({len(notes)}):\n" + "\n".join(notes) @mcp.tool() def read_escalation(filename: str) -> str: """Read a specific escalation note. Args: filename: Escalation filename (from list_escalations) """ path = ESCALATION_DIR / filename if not path.exists(): return f"Not found: {filename}" return path.read_text() # --- Logs --- # --- Session Notes (persistent memory across runs) --- @mcp.tool() def write_note(topic: str, content: str) -> str: """Save a learning or observation that should persist across runs. Use for: patterns discovered, things that work, things that don't, provider quirks, player behavior patterns, diagnostic findings. Keep notes focused and factual. One topic per note. Args: topic: Short topic key (e.g. "codex-tool-compliance", "griefer-patterns") content: The observation or learning """ notes_dir = DATA_DIR / "notes" notes_dir.mkdir(exist_ok=True) note_path = notes_dir / f"{topic}.md" entry = f"\n## {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n{content}\n" if note_path.exists(): # Append to existing topic with open(note_path, "a") as f: f.write(entry) else: # New topic with open(note_path, "w") as f: f.write(f"# {topic}\n{entry}") return f"Note saved: {note_path}" @mcp.tool() def read_notes(topic: str = "") -> str: """Read session notes. If topic is empty, lists all topics. Args: topic: Topic key to read, or empty to list all """ notes_dir = DATA_DIR / "notes" if not notes_dir.exists(): return "No notes yet" if not topic: files = sorted(notes_dir.glob("*.md")) if not files: return "No notes yet" lines = [] for f in files: size = f.stat().st_size lines.append(f" {f.stem} ({size} bytes)") return f"Topics ({len(files)}):\n" + "\n".join(lines) note_path = notes_dir / f"{topic}.md" if not note_path.exists(): return f"No notes for topic: {topic}" return note_path.read_text() @mcp.tool() def write_session_summary(summary: str) -> str: """Write a summary of this run's findings and actions. Call this at the end of every scheduled run. Keeps a rolling log of what happened, what was fixed, what was escalated. Args: summary: Brief summary of this run (findings, actions, escalations) """ log_path = DATA_DIR / "run-log.md" entry = f"\n## {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n{summary}\n\n---\n" with open(log_path, "a") as f: f.write(entry) # Keep run log under 50KB (trim oldest entries) if log_path.stat().st_size > 50_000: text = log_path.read_text() sections = text.split("\n---\n") trimmed = "\n---\n".join(sections[-(len(sections) // 2):]) log_path.write_text(trimmed) return f"Session summary saved to {log_path}" @mcp.tool() def read_run_log(entries: int = 5) -> str: """Read recent run summaries. Args: entries: Number of recent entries to show (default 5) """ log_path = DATA_DIR / "run-log.md" if not log_path.exists(): return "No run log yet — this is the first run" text = log_path.read_text() sections = text.split("\n---\n") recent = sections[-entries:] if len(sections) > entries else sections return "\n---\n".join(recent) # --- Logs --- @mcp.tool() def gateway_logs(lines: int = 50) -> str: """Read recent gateway log output. Args: lines: Number of lines to read from the end (default 50) """ log_path = Path("/tmp/mortdecai-gateway.log") if not log_path.exists(): return "No gateway log file found" try: result = subprocess.run( ["tail", f"-{lines}", str(log_path)], capture_output=True, text=True, timeout=5, ) return result.stdout or "Log file is empty" except Exception as e: return f"Error reading logs: {e}" if __name__ == "__main__": mcp.run(transport="stdio")