Files
Claude Code 8b740f3ec1 feat: external brain tools — queue, tool_execute, brain_mode_set
CLI can now BE the brain:
- brain_mode_set: switch gateway to external mode
- queue_get: poll incoming player commands
- queue_complete: send response back to player
- tool_execute: call any gateway tool directly (rcon, display, npc, etc.)

28 MCP tools total. When in external mode, Opus in the CLI
processes player commands instead of Codex/Anthropic.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-28 20:10:15 -04:00

792 lines
24 KiB
Python

"""
Mortdecai Gateway MCP Server.
Wraps the gateway HTTP API as MCP tools so Claude can operate
Mortdecai natively. All game operations go through the gateway —
this server never touches Minecraft directly.
Tool groups:
- Gateway lifecycle (start, stop, restart, status, health)
- Player commands (gateway_command)
- Brain management (hot-swap providers)
- Session management
- Bot playtesting (run profiles against the gateway)
- Diagnostics (read interactions, analyze errors)
- Escalation (write notes for architect sessions)
- Logs
"""
import json
import subprocess
import time as _time
from datetime import datetime
from pathlib import Path
import httpx
import yaml
from mcp.server.fastmcp import FastMCP
GATEWAY_URL = "http://localhost:8500"
CLI_DIR = Path(__file__).parent.parent
SCRIPTS_DIR = CLI_DIR / "scripts"
CONFIG_DIR = CLI_DIR / "config"
DATA_DIR = CLI_DIR / "data"
ESCALATION_DIR = DATA_DIR / "escalations"
PLAYTEST_DIR = DATA_DIR / "playtests"
INTERACTION_DIR = Path.home() / "bin" / "Mortdecai-2.0" / "data" / "interactions"
# Ensure data dirs exist
for d in [DATA_DIR, ESCALATION_DIR, PLAYTEST_DIR]:
d.mkdir(parents=True, exist_ok=True)
mcp = FastMCP("mortdecai-gateway")
async def _get(path: str) -> dict:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(f"{GATEWAY_URL}{path}")
resp.raise_for_status()
return resp.json()
async def _post(path: str, body: dict | None = None) -> dict:
async with httpx.AsyncClient(timeout=90) as client:
resp = await client.post(f"{GATEWAY_URL}{path}", json=body or {})
resp.raise_for_status()
return resp.json()
async def _patch(path: str, body: dict | None = None) -> dict:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.patch(f"{GATEWAY_URL}{path}", json=body or {})
resp.raise_for_status()
return resp.json()
# --- Gateway lifecycle ---
@mcp.tool()
def gateway_start() -> str:
"""Start the Mortdecai gateway if not running."""
result = subprocess.run(
["bash", str(SCRIPTS_DIR / "start-gateway.sh")],
capture_output=True, text=True, timeout=30,
)
return result.stdout + result.stderr
@mcp.tool()
def gateway_stop() -> str:
"""Stop the Mortdecai gateway."""
result = subprocess.run(
["bash", str(SCRIPTS_DIR / "stop-gateway.sh")],
capture_output=True, text=True, timeout=10,
)
return result.stdout + result.stderr
@mcp.tool()
def gateway_restart() -> str:
"""Restart the Mortdecai gateway (stop + start)."""
stop = subprocess.run(
["bash", str(SCRIPTS_DIR / "stop-gateway.sh")],
capture_output=True, text=True, timeout=10,
)
_time.sleep(2)
start = subprocess.run(
["bash", str(SCRIPTS_DIR / "start-gateway.sh")],
capture_output=True, text=True, timeout=30,
)
return stop.stdout + start.stdout + start.stderr
@mcp.tool()
async def gateway_status() -> str:
"""Get full gateway status: providers, sessions, oracle state."""
try:
data = await _get("/v2/status")
return json.dumps(data, indent=2)
except httpx.ConnectError:
return "Gateway is DOWN — not reachable on port 8500"
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def gateway_health() -> str:
"""Quick health check — is the gateway alive?"""
try:
data = await _get("/v2/health")
return json.dumps(data, indent=2)
except httpx.ConnectError:
return "DOWN"
except Exception as e:
return f"Error: {e}"
# --- Brain mode + command queue ---
@mcp.tool()
async def brain_mode_set(mode: str) -> str:
"""Switch gateway between internal (AI loop) and external (CLI brain) mode.
In external mode, player commands are queued for YOU to handle.
In internal mode, the gateway's own AI (Codex/Anthropic) handles them.
Args:
mode: "internal" or "external"
"""
try:
data = await _patch(f"/v2/brain-mode?mode={mode}")
return json.dumps(data, indent=2)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def queue_get() -> str:
"""Get pending player commands waiting for you to handle.
Only works when brain_mode is 'external'. Returns commands with
player name, mode, message, world context, and player context.
Process each command by calling tools, then complete it with queue_complete.
"""
try:
data = await _get("/v2/queue")
return json.dumps(data, indent=2)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def queue_complete(cmd_id: str, response_text: str) -> str:
"""Complete a queued command — sends the response back to the player.
Args:
cmd_id: Command ID from queue_get
response_text: The message to send to the player
"""
try:
data = await _post(f"/v2/queue/complete/{cmd_id}", {
"response_text": response_text,
"player_message": response_text,
})
return json.dumps(data, indent=2)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def tool_execute(
tool: str,
params: str = "{}",
player: str = "",
server: str = "dev",
) -> str:
"""Execute a Mortdecai gateway tool directly. YOU are the brain.
Available tools: rcon_execute, rcon_query, display_send, display_interactive,
npc_spawn, npc_bulk_spawn, npc_despawn, npc_bulk_despawn, npc_list,
npc_command, npc_script_write, npc_script_read, eye_players, eye_world,
eye_events, memory_read, memory_write, history_read, logs_read,
sound_play, world_query, schem_list, schem_place, schem_download,
creative_name, perms_manage
Args:
tool: Tool name (e.g. "rcon_execute")
params: JSON string of tool parameters
player: Player context (for session lookup)
server: Server target — dev or prod
"""
try:
tool_params = json.loads(params) if isinstance(params, str) else params
except json.JSONDecodeError:
return f"Invalid JSON params: {params}"
try:
data = await _post("/v2/tools/execute", {
"tool": tool,
"params": tool_params,
"player": player,
"server": server,
})
return json.dumps(data, indent=2)
except Exception as e:
return f"Error: {e}"
# --- Player commands (through gateway) ---
@mcp.tool()
async def gateway_command(
player: str,
text: str,
mode: str = "sudo",
server: str = "dev",
) -> str:
"""Send a command through the gateway as if a player typed it in-game.
Args:
player: Minecraft player name
text: The command text (e.g. "give me a diamond")
mode: Command mode — sudo, pray, ask, or raw
server: Server target — dev or prod
"""
try:
data = await _post("/v2/quick", {
"player": player,
"text": text,
"server": server,
"command_type": mode,
})
return json.dumps(data, indent=2)
except Exception as e:
return f"Error: {e}"
# --- Brain management (hot-swap providers) ---
@mcp.tool()
async def gateway_brain_set(
role: str,
provider: str,
model: str,
) -> str:
"""Hot-swap the AI provider and model for a gateway role.
Args:
role: Agent role — eye, hand, voice, opus, architect, orchestrator
provider: Provider name — anthropic, codex, openai, ollama, regex
model: Model identifier (e.g. "gpt-5.1-codex-mini", "claude-opus-4-20250514")
"""
try:
data = await _patch(f"/v2/brain/{role}", {
"provider": provider,
"model": model,
})
return json.dumps(data, indent=2)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def gateway_brain_save(role: str) -> str:
"""Persist a brain's current live override to agents.yaml on disk.
Args:
role: Agent role to save
"""
try:
data = await _post(f"/v2/brain/{role}/save")
return json.dumps(data, indent=2)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def gateway_brain_reload(role: str) -> str:
"""Clear in-memory override, reload brain config from agents.yaml.
Args:
role: Agent role to reload
"""
try:
data = await _post(f"/v2/brain/{role}/reload")
return json.dumps(data, indent=2)
except Exception as e:
return f"Error: {e}"
# --- Session management ---
@mcp.tool()
async def gateway_sessions_clear(player: str, mode: str = "") -> str:
"""Clear sessions for a player. Optionally filter by mode.
Args:
player: Player name
mode: Optional mode filter (sudo, pray, ask, raw). Empty = all modes.
"""
try:
url = f"/v2/sessions/clear/{player}"
if mode:
url += f"?mode={mode}"
data = await _post(url)
return json.dumps(data, indent=2)
except Exception as e:
return f"Error: {e}"
@mcp.tool()
async def gateway_sessions_reset() -> str:
"""Clear ALL sessions for ALL players. Use with caution."""
try:
data = await _post("/v2/sessions/reset")
return json.dumps(data, indent=2)
except Exception as e:
return f"Error: {e}"
# --- Bot Playtesting ---
@mcp.tool()
def list_bot_profiles() -> str:
"""List available bot profiles for playtesting."""
profiles_path = CONFIG_DIR / "bot-profiles.yaml"
if not profiles_path.exists():
return "No bot profiles found at config/bot-profiles.yaml"
with open(profiles_path) as f:
data = yaml.safe_load(f)
profiles = data.get("profiles", {})
lines = []
for name, profile in profiles.items():
cmd_count = len(profile.get("commands", []))
lines.append(f" {name}: {profile.get('description', '')} ({cmd_count} commands)")
return f"Available profiles ({len(profiles)}):\n" + "\n".join(lines)
@mcp.tool()
async def run_playtest(profile: str, server: str = "dev") -> str:
"""Run a bot profile's commands through the gateway and collect results.
Sends each command sequentially, records status/response/tools for each.
Results are saved to data/playtests/ for later analysis.
Args:
profile: Bot profile name (e.g. "noob", "griefer", "builder")
server: Server target — dev or prod
"""
profiles_path = CONFIG_DIR / "bot-profiles.yaml"
if not profiles_path.exists():
return "No bot profiles config found"
with open(profiles_path) as f:
data = yaml.safe_load(f)
profiles = data.get("profiles", {})
if profile not in profiles:
return f"Unknown profile: {profile}. Available: {list(profiles.keys())}"
bot = profiles[profile]
player = bot.get("player_name", f"Test{profile.title()}")
commands = bot.get("commands", [])
results = []
for cmd in commands:
mode = cmd.get("mode", "sudo")
text = cmd.get("text", "")
try:
resp = await _post("/v2/quick", {
"player": player,
"text": text,
"server": server,
"command_type": mode,
})
results.append({
"mode": mode,
"text": text,
"status": resp.get("status", "unknown"),
"response": (resp.get("response_text") or "")[:200],
"tools_used": [t.get("tool") for t in resp.get("tool_trace", [])],
"commands_executed": resp.get("commands_executed", []),
"error": None,
})
except Exception as e:
results.append({
"mode": mode,
"text": text,
"status": "error",
"response": "",
"tools_used": [],
"commands_executed": [],
"error": str(e),
})
# Brief pause between commands to avoid overwhelming
_time.sleep(1)
# Summarize
total = len(results)
passed = sum(1 for r in results if r["status"] == "completed" and not r["error"])
failed = sum(1 for r in results if r["status"] != "completed" or r["error"])
no_tools = sum(1 for r in results if r["status"] == "completed" and not r["tools_used"])
report = {
"profile": profile,
"player": player,
"timestamp": datetime.now().isoformat(),
"summary": {
"total": total,
"passed": passed,
"failed": failed,
"no_tools_used": no_tools,
},
"results": results,
}
# Save report
filename = f"{datetime.now().strftime('%Y%m%d-%H%M')}-{profile}.json"
report_path = PLAYTEST_DIR / filename
report_path.write_text(json.dumps(report, indent=2))
# Return summary
summary_lines = [f"Playtest: {profile} ({player}) — {passed}/{total} passed, {failed} failed, {no_tools} no-tool-use"]
for r in results:
status_icon = "OK" if r["status"] == "completed" and not r["error"] else "FAIL"
tool_str = ",".join(r["tools_used"]) if r["tools_used"] else "NO_TOOLS"
summary_lines.append(f" [{status_icon}] /{r['mode']} {r['text'][:50]}{tool_str}")
if r["error"]:
summary_lines.append(f" error: {r['error'][:100]}")
summary_lines.append(f"\nReport saved: {report_path}")
return "\n".join(summary_lines)
@mcp.tool()
def list_playtest_reports(limit: int = 10) -> str:
"""List recent playtest reports.
Args:
limit: Max number of reports to show (default 10)
"""
reports = sorted(PLAYTEST_DIR.glob("*.json"), reverse=True)[:limit]
if not reports:
return "No playtest reports found"
lines = []
for r in reports:
try:
data = json.loads(r.read_text())
s = data.get("summary", {})
lines.append(f" {r.name}: {data.get('profile')}{s.get('passed',0)}/{s.get('total',0)} passed")
except Exception:
lines.append(f" {r.name}: (unreadable)")
return f"Recent reports ({len(reports)}):\n" + "\n".join(lines)
# --- Diagnostics ---
@mcp.tool()
def read_interactions(date: str = "", limit: int = 20) -> str:
"""Read recent gateway interaction logs for analysis.
Args:
date: Date string YYYY-MM-DD (default: today)
limit: Max interactions to return (default 20)
"""
if not date:
date = datetime.now().strftime("%Y-%m-%d")
log_path = INTERACTION_DIR / f"{date}.jsonl"
if not log_path.exists():
return f"No interaction log for {date}"
lines = log_path.read_text().strip().split("\n")
recent = lines[-limit:]
results = []
for line in recent:
try:
d = json.loads(line)
tools = [t.get("tool") for t in d.get("tool_trace", [])]
results.append({
"player": d.get("player"),
"mode": d.get("mode"),
"message": (d.get("message") or "")[:80],
"status": d.get("status"),
"tools": tools,
"has_commands": bool(d.get("commands_executed")),
"response_preview": (d.get("response_text") or "")[:100],
})
except Exception:
continue
return json.dumps(results, indent=2)
@mcp.tool()
def analyze_errors(date: str = "", hours: int = 4) -> str:
"""Analyze recent gateway logs and interactions for error patterns.
Checks for: repeated errors, tool-use failures, timeouts, empty responses,
session poisoning (text-only responses with no tool calls).
Args:
date: Date string YYYY-MM-DD (default: today)
hours: How many hours back to analyze (default 4)
"""
issues = []
# Check gateway log for errors
log_path = Path("/tmp/mortdecai-gateway.log")
if log_path.exists():
try:
log_text = log_path.read_text()
error_lines = [l for l in log_text.split("\n") if "ERROR" in l or "Traceback" in l]
if error_lines:
issues.append({
"type": "gateway_errors",
"count": len(error_lines),
"recent": error_lines[-3:],
})
except Exception:
pass
# Check interaction logs
if not date:
date = datetime.now().strftime("%Y-%m-%d")
interaction_path = INTERACTION_DIR / f"{date}.jsonl"
if interaction_path.exists():
cutoff = _time.time() - (hours * 3600)
interactions = []
for line in interaction_path.read_text().strip().split("\n"):
try:
d = json.loads(line)
if d.get("timestamp", 0) > cutoff:
interactions.append(d)
except Exception:
continue
# Check for text-only responses (no tool calls)
no_tools = [i for i in interactions if not i.get("tool_trace") and i.get("status") == "completed"]
if no_tools:
issues.append({
"type": "no_tool_use",
"count": len(no_tools),
"description": "Completed responses with no tool calls (model responded with text only)",
"examples": [{"player": i.get("player"), "mode": i.get("mode"), "msg": i.get("message", "")[:60]} for i in no_tools[:3]],
})
# Check for errors/timeouts
errors = [i for i in interactions if i.get("status") in ("error", "timeout")]
if errors:
issues.append({
"type": "request_failures",
"count": len(errors),
"examples": [{"player": i.get("player"), "mode": i.get("mode"), "status": i.get("status"), "msg": i.get("message", "")[:60]} for i in errors[:3]],
})
# Check for empty responses
empty = [i for i in interactions if not i.get("response_text") and i.get("status") == "completed"]
if empty:
issues.append({
"type": "empty_responses",
"count": len(empty),
"examples": [{"player": i.get("player"), "mode": i.get("mode"), "msg": i.get("message", "")[:60]} for i in empty[:3]],
})
if not issues:
return f"No issues found in the last {hours} hours."
return json.dumps({"issues_found": len(issues), "issues": issues}, indent=2)
# --- Escalation ---
@mcp.tool()
def write_escalation(
title: str,
severity: str,
description: str,
evidence: str = "",
suggested_fix: str = "",
) -> str:
"""Write an escalation note for the architect session (Seth + Claude).
Use this when you find an issue you cannot or should not fix yourself.
Args:
title: Short title for the issue
severity: low, medium, high, critical
description: What's wrong and how you discovered it
evidence: Log lines, interaction data, or other evidence
suggested_fix: Your recommendation for how to fix it (optional)
"""
note = {
"title": title,
"severity": severity,
"description": description,
"evidence": evidence,
"suggested_fix": suggested_fix,
"timestamp": datetime.now().isoformat(),
"status": "open",
}
filename = f"{datetime.now().strftime('%Y%m%d-%H%M')}-{title[:40].replace(' ', '-').lower()}.json"
path = ESCALATION_DIR / filename
path.write_text(json.dumps(note, indent=2))
return f"Escalation written: {path}"
@mcp.tool()
def list_escalations(status: str = "open") -> str:
"""List escalation notes, optionally filtered by status.
Args:
status: Filter by status — open, resolved, all (default: open)
"""
files = sorted(ESCALATION_DIR.glob("*.json"), reverse=True)
if not files:
return "No escalations found"
notes = []
for f in files:
try:
data = json.loads(f.read_text())
if status != "all" and data.get("status") != status:
continue
notes.append(f" [{data.get('severity','?').upper()}] {data.get('title','')} ({f.name})")
except Exception:
continue
if not notes:
return f"No {status} escalations"
return f"Escalations ({len(notes)}):\n" + "\n".join(notes)
@mcp.tool()
def read_escalation(filename: str) -> str:
"""Read a specific escalation note.
Args:
filename: Escalation filename (from list_escalations)
"""
path = ESCALATION_DIR / filename
if not path.exists():
return f"Not found: {filename}"
return path.read_text()
# --- Logs ---
# --- Session Notes (persistent memory across runs) ---
@mcp.tool()
def write_note(topic: str, content: str) -> str:
"""Save a learning or observation that should persist across runs.
Use for: patterns discovered, things that work, things that don't,
provider quirks, player behavior patterns, diagnostic findings.
Keep notes focused and factual. One topic per note.
Args:
topic: Short topic key (e.g. "codex-tool-compliance", "griefer-patterns")
content: The observation or learning
"""
notes_dir = DATA_DIR / "notes"
notes_dir.mkdir(exist_ok=True)
note_path = notes_dir / f"{topic}.md"
entry = f"\n## {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n{content}\n"
if note_path.exists():
# Append to existing topic
with open(note_path, "a") as f:
f.write(entry)
else:
# New topic
with open(note_path, "w") as f:
f.write(f"# {topic}\n{entry}")
return f"Note saved: {note_path}"
@mcp.tool()
def read_notes(topic: str = "") -> str:
"""Read session notes. If topic is empty, lists all topics.
Args:
topic: Topic key to read, or empty to list all
"""
notes_dir = DATA_DIR / "notes"
if not notes_dir.exists():
return "No notes yet"
if not topic:
files = sorted(notes_dir.glob("*.md"))
if not files:
return "No notes yet"
lines = []
for f in files:
size = f.stat().st_size
lines.append(f" {f.stem} ({size} bytes)")
return f"Topics ({len(files)}):\n" + "\n".join(lines)
note_path = notes_dir / f"{topic}.md"
if not note_path.exists():
return f"No notes for topic: {topic}"
return note_path.read_text()
@mcp.tool()
def write_session_summary(summary: str) -> str:
"""Write a summary of this run's findings and actions.
Call this at the end of every scheduled run. Keeps a rolling log
of what happened, what was fixed, what was escalated.
Args:
summary: Brief summary of this run (findings, actions, escalations)
"""
log_path = DATA_DIR / "run-log.md"
entry = f"\n## {datetime.now().strftime('%Y-%m-%d %H:%M')}\n\n{summary}\n\n---\n"
with open(log_path, "a") as f:
f.write(entry)
# Keep run log under 50KB (trim oldest entries)
if log_path.stat().st_size > 50_000:
text = log_path.read_text()
sections = text.split("\n---\n")
trimmed = "\n---\n".join(sections[-(len(sections) // 2):])
log_path.write_text(trimmed)
return f"Session summary saved to {log_path}"
@mcp.tool()
def read_run_log(entries: int = 5) -> str:
"""Read recent run summaries.
Args:
entries: Number of recent entries to show (default 5)
"""
log_path = DATA_DIR / "run-log.md"
if not log_path.exists():
return "No run log yet — this is the first run"
text = log_path.read_text()
sections = text.split("\n---\n")
recent = sections[-entries:] if len(sections) > entries else sections
return "\n---\n".join(recent)
# --- Logs ---
@mcp.tool()
def gateway_logs(lines: int = 50) -> str:
"""Read recent gateway log output.
Args:
lines: Number of lines to read from the end (default 50)
"""
log_path = Path("/tmp/mortdecai-gateway.log")
if not log_path.exists():
return "No gateway log file found"
try:
result = subprocess.run(
["tail", f"-{lines}", str(log_path)],
capture_output=True, text=True, timeout=5,
)
return result.stdout or "Log file is empty"
except Exception as e:
return f"Error reading logs: {e}"
if __name__ == "__main__":
mcp.run(transport="stdio")