#!/usr/bin/env python3 """ Evaluation Harness: Structured scoring for Minecraft ops assistant models. Runs a model against the full dataset, scores on multiple metrics with per-category breakdowns, saves results, and optionally compares against a saved baseline. Usage: python3 eval/harness.py # eval default model python3 eval/harness.py --model qwen3:8b # eval specific model python3 eval/harness.py --baseline results/baseline.json # compare to baseline python3 eval/harness.py --save-baseline # save as the new baseline python3 eval/harness.py --category command_gen # eval only one category """ import argparse import json import re import sys import time from collections import defaultdict from pathlib import Path import requests ROOT = Path(__file__).resolve().parent.parent sys.path.insert(0, str(ROOT)) from agent.prompts.system_prompts import get_prompt from agent.guardrails.command_filter import validate_command DATASET = ROOT / "data" / "processed" / "seed_dataset.jsonl" RESULTS_DIR = ROOT / "eval" / "results" BASELINE_PATH = RESULTS_DIR / "baseline.json" # --- Ollama API --- def ollama_chat(model: str, messages: list, ollama_url: str, temperature: float = 0.2, max_tokens: int = 1500) -> dict: """Call Ollama chat API. Returns content, timing, and token counts.""" payload = { "model": model, "messages": messages, "stream": False, "format": "json", "options": { "temperature": temperature, "num_predict": max_tokens, }, } start = time.time() r = requests.post(f"{ollama_url}/api/chat", json=payload, timeout=180) r.raise_for_status() duration_ms = int((time.time() - start) * 1000) data = r.json() return { "content": data["message"]["content"], "duration_ms": duration_ms, "eval_count": data.get("eval_count", 0), "prompt_eval_count": data.get("prompt_eval_count", 0), "done_reason": data.get("done_reason", ""), } def parse_response(content: str) -> dict: """Parse LLM JSON response, with fallback regex extraction.""" try: return json.loads(content) except json.JSONDecodeError: cmds = re.findall(r'"(/?\w[^"]*)"', content) return {"commands": cmds, "message": "", "reasoning": "parse_fallback"} # --- Message Building --- def build_user_message(example: dict) -> str: """Build user message from a dataset example, including server context.""" inp = example["input"] query = inp["user_message"] ctx = inp.get("server_context", {}) parts = [f"Request from slingshooter08: {query}"] parts.append(f"\nContext:\nServer: {ctx.get('server_type', 'paper')} {ctx.get('version', '1.21.x')}") if ctx.get("online_players"): parts.append(f"Online: {', '.join(ctx['online_players'])}") pos = ctx.get("player_position") if pos: parts.append(f"Player position: ({pos['x']}, {pos['y']}, {pos['z']})") return "\n".join(parts) def determine_mode(example: dict) -> str: """Determine prompt mode (sudo/god/god_system) from the example.""" query = example["input"]["user_message"] eid = example.get("id", "") if query.lower().startswith("pray "): return "god" elif eid.startswith("negative-") and "god" in query.lower(): return "god_system" return "sudo" # --- Scoring --- def score_result(example: dict, actual_cmds: list, parsed: dict) -> dict: """Score a single result against expected output. Returns detailed metrics.""" expected = example["output"] expected_cmds = expected.get("commands", []) expected_safety = expected.get("safety_flags", []) category = example.get("category", "?") # --- Command Match (loose) --- cmd_match = False if not expected_cmds and not actual_cmds: cmd_match = True elif expected_cmds and actual_cmds: expected_types = set(c.split()[0].lstrip("/") for c in expected_cmds if c) actual_types = set(c.split()[0].lstrip("/") for c in actual_cmds if c) cmd_match = len(expected_types & actual_types) > 0 # --- Exact Match (strict) --- exact_match = set(c.strip() for c in expected_cmds) == set(c.strip() for c in actual_cmds) # --- Syntax Quality --- syntax_issues = [] for cmd in actual_cmds: if "{Enchantments:[" in cmd or "{enchantments:[" in cmd: syntax_issues.append("old_nbt_enchant") if re.search(r"(give|effect give) \w+ (?!minecraft:)\w+", cmd): syntax_issues.append("missing_namespace") if re.match(r"^effect \w+ (?!give)", cmd): syntax_issues.append("bare_effect") if "weather storm" in cmd: syntax_issues.append("weather_storm") if re.search(r"gamemode [csa0-3](\s|$)", cmd): syntax_issues.append("gamemode_abbrev") # Run through guardrail validator for extra checks v = validate_command(cmd) if v.get("warnings"): syntax_issues.extend(v["warnings"]) syntax_ok = len(syntax_issues) == 0 # --- Safety --- safety_ok = True if "destructive" in expected_safety and not expected_cmds: safety_ok = len(actual_cmds) == 0 # Also check: did model execute blocked commands? if expected_safety and "op_required" in expected_safety: for cmd in actual_cmds: lower = cmd.lower() if "op " in lower or "ban " in lower or "deop " in lower: safety_ok = False # --- Gratuitous Actions --- has_gratuitous_tp = False if category != "safety": query_lower = example["input"]["user_message"].lower() tp_words = ["tp", "teleport", "surface", "spawn", "take me", "bring me"] if not any(w in query_lower for w in tp_words): for cmd in actual_cmds: if cmd.startswith("tp ") or "run tp " in cmd: has_gratuitous_tp = True # --- Message Quality (prayer mode) --- has_message = bool(parsed.get("message")) expects_message = bool(expected.get("message")) # --- Empty Response Detection --- is_empty = len(actual_cmds) == 0 and not parsed.get("message") # --- Hallucination Detection --- hallucinated = False for cmd in actual_cmds: # Check for obviously fake items/effects if re.search(r"minecraft:(invulnerability|fly|friendly_mob|gun|laser)", cmd): hallucinated = True # Check for FollowPlayer or other fake NBT tags if "FollowPlayer" in cmd or "FriendlyMode" in cmd: hallucinated = True return { "cmd_match": cmd_match, "exact_match": exact_match, "syntax_ok": syntax_ok, "syntax_issues": syntax_issues, "safety_ok": safety_ok, "has_gratuitous_tp": has_gratuitous_tp, "has_message": has_message, "expects_message": expects_message, "is_empty": is_empty, "hallucinated": hallucinated, } # --- Eval Runner --- def run_eval(model: str, ollama_url: str, max_tokens: int = 1500, category_filter: str = None) -> dict: """Run evaluation on one model. Returns full results dict.""" with open(DATASET) as f: examples = [json.loads(line) for line in f if line.strip()] if category_filter: examples = [ex for ex in examples if ex.get("category") == category_filter] total = len(examples) print(f"Evaluating {model} on {total} examples") print(f"Ollama: {ollama_url}") print("=" * 70) # Warm up model print(f"Loading {model}...") try: warmup = ollama_chat(model, [{"role": "user", "content": "Say OK"}], ollama_url, max_tokens=5) print(f" Loaded in {warmup['duration_ms']}ms") except Exception as e: print(f" ERROR loading {model}: {e}") return {"model": model, "error": str(e)} results = [] for i, ex in enumerate(examples): eid = ex.get("id", f"ex-{i}") category = ex.get("category", "?") query = ex["input"]["user_message"] mode = determine_mode(ex) messages = [ {"role": "system", "content": get_prompt(mode)}, {"role": "user", "content": build_user_message(ex)}, ] try: resp = ollama_chat(model, messages, ollama_url, max_tokens=max_tokens) except Exception as e: print(f" [{i+1}/{total}] ERROR: {e}") results.append({"id": eid, "error": str(e)}) continue parsed = parse_response(resp["content"]) actual_cmds = parsed.get("commands", []) scores = score_result(ex, actual_cmds, parsed) # Status line status = "OK" if scores["cmd_match"] else "MISS" flags = "" if not scores["syntax_ok"]: flags += " [SYNTAX]" if scores["has_gratuitous_tp"]: flags += " [GRAT-TP]" if not scores["safety_ok"]: flags += " [SAFETY]" if scores["is_empty"]: flags += " [EMPTY]" if scores["hallucinated"]: flags += " [HALLUC]" print(f" [{i+1}/{total}] [{status}]{flags} ({category}) " f"{query[:50]} [{resp['duration_ms']}ms]") if not scores["cmd_match"]: expected_cmds = ex["output"].get("commands", []) print(f" Expected: {expected_cmds[:2]}") print(f" Got: {actual_cmds[:2]}") results.append({ "id": eid, "category": category, "query": query, "mode": mode, "expected": ex["output"].get("commands", []), "actual": actual_cmds, "message": parsed.get("message", ""), "reasoning": parsed.get("reasoning", ""), "raw_content": resp["content"], "duration_ms": resp["duration_ms"], "eval_tokens": resp["eval_count"], "done_reason": resp["done_reason"], **scores, }) return { "model": model, "ollama_url": ollama_url, "max_tokens": max_tokens, "timestamp": int(time.time()), "dataset_size": total, "results": results, } # --- Summary / Reporting --- def compute_summary(eval_data: dict) -> dict: """Compute aggregate and per-category scores from eval results.""" results = [r for r in eval_data["results"] if "error" not in r] n = len(results) if n == 0: return {"n": 0} def pct(predicate): return round(sum(1 for r in results if predicate(r)) / n * 100, 1) # Per-category breakdown categories = defaultdict(list) for r in results: categories[r["category"]].append(r) cat_scores = {} for cat, cat_results in sorted(categories.items()): cn = len(cat_results) cat_scores[cat] = { "n": cn, "cmd_match_%": round(sum(1 for r in cat_results if r["cmd_match"]) / cn * 100, 1), "exact_match_%": round(sum(1 for r in cat_results if r["exact_match"]) / cn * 100, 1), "syntax_ok_%": round(sum(1 for r in cat_results if r["syntax_ok"]) / cn * 100, 1), "safety_%": round(sum(1 for r in cat_results if r["safety_ok"]) / cn * 100, 1), "empty_%": round(sum(1 for r in cat_results if r["is_empty"]) / cn * 100, 1), } return { "model": eval_data["model"], "n": n, "dataset_size": eval_data["dataset_size"], "timestamp": eval_data["timestamp"], "overall": { "cmd_match_%": pct(lambda r: r["cmd_match"]), "exact_match_%": pct(lambda r: r["exact_match"]), "syntax_ok_%": pct(lambda r: r["syntax_ok"]), "safety_%": pct(lambda r: r["safety_ok"]), "no_gratuitous_tp_%": pct(lambda r: not r["has_gratuitous_tp"]), "no_hallucination_%": pct(lambda r: not r["hallucinated"]), "empty_%": pct(lambda r: r["is_empty"]), "avg_latency_ms": int(sum(r["duration_ms"] for r in results) / n), "avg_tokens": int(sum(r.get("eval_tokens", 0) for r in results) / n), }, "by_category": cat_scores, } def print_summary(summary: dict, baseline_summary: dict = None): """Print a formatted summary table, optionally with baseline comparison.""" print("\n" + "=" * 70) print(f"EVALUATION SUMMARY: {summary['model']}") print(f" {summary['n']} examples evaluated at {time.strftime('%Y-%m-%d %H:%M', time.localtime(summary['timestamp']))}") print("=" * 70) ov = summary["overall"] def delta_str(key, higher_is_better=True): if not baseline_summary: return "" bv = baseline_summary.get("overall", {}).get(key) if bv is None: return "" diff = ov[key] - bv if abs(diff) < 0.05: return " (=)" arrow = "+" if diff > 0 else "" color = "" if (diff > 0) == higher_is_better else " !!!" return f" ({arrow}{diff:.1f}%{color})" print(f"\n Overall Scores:") print(f" Command match ........ {ov['cmd_match_%']:5.1f}%{delta_str('cmd_match_%')}") print(f" Exact match .......... {ov['exact_match_%']:5.1f}%{delta_str('exact_match_%')}") print(f" Syntax correct ....... {ov['syntax_ok_%']:5.1f}%{delta_str('syntax_ok_%')}") print(f" Safety compliance .... {ov['safety_%']:5.1f}%{delta_str('safety_%')}") print(f" No gratuitous tp ..... {ov['no_gratuitous_tp_%']:5.1f}%{delta_str('no_gratuitous_tp_%')}") print(f" No hallucination ..... {ov['no_hallucination_%']:5.1f}%{delta_str('no_hallucination_%')}") print(f" Empty responses ...... {ov['empty_%']:5.1f}%{delta_str('empty_%', higher_is_better=False)}") print(f" Avg latency .......... {ov['avg_latency_ms']}ms") print(f" Avg tokens/response .. {ov['avg_tokens']}") print(f"\n Per-Category Breakdown:") print(f" {'Category':<16} {'N':>4} {'Cmd%':>7} {'Exact%':>7} {'Syntax%':>8} {'Safety%':>8} {'Empty%':>7}") print(f" {'-'*16} {'-'*4} {'-'*7} {'-'*7} {'-'*8} {'-'*8} {'-'*7}") for cat, cs in summary["by_category"].items(): print(f" {cat:<16} {cs['n']:>4} {cs['cmd_match_%']:>6.1f}% {cs['exact_match_%']:>6.1f}% " f"{cs['syntax_ok_%']:>7.1f}% {cs['safety_%']:>7.1f}% {cs['empty_%']:>6.1f}%") # Identify weakest areas print(f"\n Weakest Categories (by cmd_match):") sorted_cats = sorted(summary["by_category"].items(), key=lambda x: x[1]["cmd_match_%"]) for cat, cs in sorted_cats[:3]: print(f" {cat}: {cs['cmd_match_%']:.1f}% cmd match ({cs['n']} examples)") def print_failures(eval_data: dict, limit: int = 10): """Print details of failed examples for debugging.""" failures = [r for r in eval_data["results"] if "error" not in r and not r["cmd_match"]] if not failures: print("\n No failures!") return print(f"\n Failed Examples ({len(failures)} total, showing {min(limit, len(failures))}):") print(f" {'-'*60}") for r in failures[:limit]: print(f" [{r['id']}] ({r['category']}) {r['query'][:60]}") print(f" Expected: {r['expected'][:2]}") print(f" Got: {r['actual'][:2]}") if r.get("syntax_issues"): print(f" Syntax: {r['syntax_issues']}") print() # --- Main --- def main(): parser = argparse.ArgumentParser(description="Eval Harness for MC Ops Assistant") parser.add_argument("--model", default="gemma3n:e4b", help="Model to evaluate (default: gemma3n:e4b)") parser.add_argument("--ollama-url", default="http://192.168.0.179:11434") parser.add_argument("--max-tokens", type=int, default=1500) parser.add_argument("--category", default=None, help="Filter to a single category") parser.add_argument("--baseline", default=None, help="Path to baseline JSON for comparison") parser.add_argument("--save-baseline", action="store_true", help="Save this run as the new baseline") parser.add_argument("--show-failures", type=int, default=10, metavar="N", help="Show N failure details (default: 10, 0 to hide)") args = parser.parse_args() # Run evaluation eval_data = run_eval(args.model, args.ollama_url, max_tokens=args.max_tokens, category_filter=args.category) if "error" in eval_data: print(f"Evaluation failed: {eval_data['error']}") sys.exit(1) # Compute summary summary = compute_summary(eval_data) # Load baseline for comparison baseline_summary = None baseline_path = args.baseline or BASELINE_PATH if Path(baseline_path).exists(): with open(baseline_path) as f: baseline_data = json.load(f) baseline_summary = baseline_data.get("summary") if baseline_summary: print(f"\n Comparing against baseline: {baseline_summary.get('model', '?')} " f"({baseline_summary.get('n', '?')} examples, " f"{time.strftime('%Y-%m-%d', time.localtime(baseline_summary.get('timestamp', 0)))})") # Print results print_summary(summary, baseline_summary) if args.show_failures > 0: print_failures(eval_data, limit=args.show_failures) # Save results RESULTS_DIR.mkdir(parents=True, exist_ok=True) ts = int(time.time()) out_path = RESULTS_DIR / f"eval_{args.model.replace(':', '_')}_{ts}.json" save_data = { "summary": summary, "eval_data": eval_data, } with open(out_path, "w") as f: json.dump(save_data, f, indent=2) print(f"\nResults saved to {out_path}") # Save as baseline if requested if args.save_baseline: with open(BASELINE_PATH, "w") as f: json.dump(save_data, f, indent=2) print(f"Baseline saved to {BASELINE_PATH}") return summary if __name__ == "__main__": main()