Files

294 lines
9.2 KiB
Python

#!/usr/bin/env python3
"""Workbench MCP Server — AI-driven hardware diagnostic tool.
Exposes 6 MCP tools for scaffolding, state management, and logging
of interactive hardware diagnostic web pages served over LAN.
"""
import asyncio
import json
import os
import socket
from datetime import datetime, timezone
from pathlib import Path
from aiohttp import web
from mcp.server.fastmcp import FastMCP
WORKBENCH_DIR = Path.home() / "workbench"
SCAFFOLD_HTML = Path(__file__).parent / "scaffold.html"
DEFAULT_PORT = 8070
SETHMUX_URL = os.environ.get("WORKBENCH_SETHMUX_URL", "https://mux.sethpc.xyz")
# Track active projects: {name: {"port": int, "runner": web.AppRunner, "ws_clients": set}}
active_projects: dict = {}
def get_lan_ip() -> str:
"""Get the machine's LAN IP address."""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("192.168.0.1", 80))
ip = s.getsockname()[0]
s.close()
return ip
except Exception:
return "127.0.0.1"
def find_free_port(start: int = DEFAULT_PORT) -> int:
"""Find a free port starting from the given port."""
for port in range(start, start + 100):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("0.0.0.0", port))
s.close()
return port
except OSError:
continue
raise RuntimeError(f"No free port found in range {start}-{start+99}")
def now_iso() -> str:
return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
def project_path(name: str) -> Path:
return WORKBENCH_DIR / name
# --- HTTP/WebSocket server per project ---
async def ws_handler(request):
"""WebSocket endpoint for pushing state/log to browser."""
ws = web.WebSocketResponse()
await ws.prepare(request)
project_name = request.app["project_name"]
if project_name in active_projects:
active_projects[project_name]["ws_clients"].add(ws)
try:
async for msg in ws:
pass # Browser doesn't send us anything we need
finally:
if project_name in active_projects:
active_projects[project_name]["ws_clients"].discard(ws)
return ws
async def static_handler(request):
"""Serve files from the project directory."""
project_name = request.app["project_name"]
path = request.match_info.get("path", "index.html") or "index.html"
file_path = project_path(project_name) / path
if not file_path.exists():
return web.Response(status=404, text="Not found")
return web.FileResponse(file_path)
async def start_http_server(name: str, port: int) -> web.AppRunner:
"""Start an HTTP + WebSocket server for a project."""
app = web.Application()
app["project_name"] = name
app.router.add_get("/ws", ws_handler)
app.router.add_get("/{path:.*}", static_handler)
app.router.add_get("/", static_handler)
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, "0.0.0.0", port)
await site.start()
return runner
async def broadcast_ws(name: str, message: dict):
"""Send a message to all WebSocket clients of a project."""
if name not in active_projects:
return
clients = active_projects[name]["ws_clients"]
dead = set()
for ws in clients:
try:
await ws.send_json(message)
except Exception:
dead.add(ws)
clients -= dead
# --- MCP Tools ---
mcp = FastMCP("workbench", instructions="Hardware diagnostic workbench — serve interactive diagnostic pages over LAN")
@mcp.tool()
async def workbench_scaffold(name: str, title: str, description: str = "") -> str:
"""Create a new workbench project with split-pane diagnostic page + sethmux terminal.
Returns the LAN URL to open on your phone.
"""
pdir = project_path(name)
if pdir.exists():
# Already exists — just start the server if not running
if name not in active_projects:
port = find_free_port()
runner = await start_http_server(name, port)
active_projects[name] = {"port": port, "runner": runner, "ws_clients": set()}
ip = get_lan_ip()
port = active_projects[name]["port"]
return json.dumps({"path": str(pdir), "url": f"http://{ip}:{port}"})
pdir.mkdir(parents=True)
(pdir / "assets").mkdir()
# Copy and fill scaffold
template = SCAFFOLD_HTML.read_text()
html = template.replace("{{TITLE}}", title)
html = html.replace("{{DESCRIPTION}}", description)
html = html.replace("{{SETHMUX_URL}}", SETHMUX_URL)
(pdir / "index.html").write_text(html)
# Init log files
(pdir / "session.md").write_text(f"# {title} — Diagnostic Session\n\nStarted: {now_iso()}\n\n")
(pdir / "session.jsonl").write_text("")
(pdir / "cost-log.jsonl").write_text(
json.dumps({"ts": now_iso(), "event": "session_start", "project": name}) + "\n"
)
(pdir / "state.json").write_text("{}")
# Start server
port = find_free_port()
runner = await start_http_server(name, port)
active_projects[name] = {"port": port, "runner": runner, "ws_clients": set()}
ip = get_lan_ip()
return json.dumps({"path": str(pdir), "url": f"http://{ip}:{port}"})
@mcp.tool()
async def workbench_state(project: str, state: str) -> str:
"""Push state update to the browser via WebSocket.
The state is arbitrary JSON — the AI decides the schema.
Include a 'template' field (HTML string) to replace the diagnostic content area.
Include 'styles' for CSS and 'script' for JS to execute.
"""
state_obj = json.loads(state)
# Save to disk
pdir = project_path(project)
if not pdir.exists():
return json.dumps({"error": f"Project '{project}' not found. Run workbench_scaffold first."})
(pdir / "state.json").write_text(json.dumps(state_obj, indent=2))
# Push to browser
await broadcast_ws(project, {"type": "state", "state": state_obj})
return json.dumps({"ok": True})
@mcp.tool()
async def workbench_log(project: str, entry: str, data: str = "{}") -> str:
"""Append a log entry to the session log. Shows in the browser log feed.
entry: Human-readable markdown string (e.g., "R412 measured 1.05M — drifted +16.7%, FAIL")
data: Optional JSON for the machine-readable log
"""
pdir = project_path(project)
if not pdir.exists():
return json.dumps({"error": f"Project '{project}' not found."})
ts = now_iso()
# Append to session.md
with open(pdir / "session.md", "a") as f:
f.write(f"\n### {ts}\n{entry}\n")
# Append to session.jsonl
data_obj = json.loads(data) if data else {}
data_obj["ts"] = ts
data_obj["entry"] = entry
with open(pdir / "session.jsonl", "a") as f:
f.write(json.dumps(data_obj) + "\n")
# Push to browser
await broadcast_ws(project, {"type": "log", "entry": entry})
return json.dumps({"ok": True})
@mcp.tool()
async def workbench_read_log(project: str, tail: int = 20) -> str:
"""Read recent session log entries so AI can resume a session."""
pdir = project_path(project)
if not pdir.exists():
return json.dumps({"error": f"Project '{project}' not found."})
jsonl_path = pdir / "session.jsonl"
if not jsonl_path.exists():
return json.dumps({"entries": []})
lines = jsonl_path.read_text().strip().split("\n")
recent = lines[-tail:] if len(lines) > tail else lines
entries = []
for line in recent:
if line.strip():
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
entries.append({"raw": line})
return json.dumps({"entries": entries})
@mcp.tool()
async def workbench_list() -> str:
"""List all workbench projects and their status."""
if not WORKBENCH_DIR.exists():
return json.dumps({"projects": []})
projects = []
for d in sorted(WORKBENCH_DIR.iterdir()):
if d.is_dir():
info = {"name": d.name, "active": d.name in active_projects}
if d.name in active_projects:
ip = get_lan_ip()
port = active_projects[d.name]["port"]
info["url"] = f"http://{ip}:{port}"
info["ws_clients"] = len(active_projects[d.name]["ws_clients"])
projects.append(info)
return json.dumps({"projects": projects})
@mcp.tool()
async def workbench_stop(project: str) -> str:
"""Stop the HTTP/WebSocket server for a project and log session end."""
if project not in active_projects:
return json.dumps({"error": f"Project '{project}' is not running."})
# Log session end to cost-log
pdir = project_path(project)
jsonl_path = pdir / "session.jsonl"
entry_count = 0
if jsonl_path.exists():
entry_count = sum(1 for line in jsonl_path.read_text().strip().split("\n") if line.strip())
cost_entry = {
"ts": now_iso(),
"event": "session_end",
"project": project,
"log_entries": entry_count,
}
with open(pdir / "cost-log.jsonl", "a") as f:
f.write(json.dumps(cost_entry) + "\n")
# Shutdown server
runner = active_projects[project]["runner"]
await runner.cleanup()
del active_projects[project]
return json.dumps({"ok": True})
if __name__ == "__main__":
mcp.run(transport="stdio")