diff --git a/data/ingest_audit.py b/data/ingest_audit.py new file mode 100644 index 0000000..4e05680 --- /dev/null +++ b/data/ingest_audit.py @@ -0,0 +1,272 @@ +#!/usr/bin/env python3 +""" +ingest_audit.py — Pull training audit logs from CT 644, filter, and merge into dataset. + +Filters: +- Drops examples where output language doesn't match input language +- Drops empty responses (no commands AND no message) +- Drops duplicate user_messages already in the dataset +- Keeps multilingual examples where input/output languages match +- Tags all as validated=false, needs human review + +Usage: + python3 data/ingest_audit.py # pull, filter, merge + python3 data/ingest_audit.py --dry-run # show what would be added + python3 data/ingest_audit.py --source dev # only dev server + python3 data/ingest_audit.py --source prod # only prod server +""" + +import argparse +import json +import re +import subprocess +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parent.parent +DATASET = ROOT / "data" / "processed" / "seed_dataset.jsonl" +RAW_DIR = ROOT / "data" / "raw" + + +# --- Language detection (simple, no dependencies) --- + +def has_cjk(text: str) -> bool: + """Check if text contains CJK (Chinese/Japanese/Korean) characters.""" + return bool(re.search(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff\uac00-\ud7af]', text)) + + +def has_cyrillic(text: str) -> bool: + return bool(re.search(r'[\u0400-\u04ff]', text)) + + +def has_arabic(text: str) -> bool: + return bool(re.search(r'[\u0600-\u06ff]', text)) + + +def detect_script(text: str) -> str: + """Detect the dominant script of a text. Returns 'latin', 'cjk', 'cyrillic', 'arabic', or 'mixed'.""" + if not text or len(text.strip()) < 3: + return "latin" + # Count character types + latin = len(re.findall(r'[a-zA-Z]', text)) + cjk = len(re.findall(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff\uac00-\ud7af]', text)) + cyrillic = len(re.findall(r'[\u0400-\u04ff]', text)) + arabic = len(re.findall(r'[\u0600-\u06ff]', text)) + + total = latin + cjk + cyrillic + arabic + if total == 0: + return "latin" + + if cjk / total > 0.3: + return "cjk" + if cyrillic / total > 0.3: + return "cyrillic" + if arabic / total > 0.3: + return "arabic" + return "latin" + + +def languages_match(input_text: str, output_message: str) -> bool: + """Check if input and output are in the same script family. + Commands are always English/latin, so we only check the message field.""" + if not output_message or len(output_message.strip()) < 3: + return True # no message to check + input_script = detect_script(input_text) + output_script = detect_script(output_message) + return input_script == output_script + + +def is_system_prompt_leak(message: str) -> bool: + """Detect if the model leaked its system prompt as the God message.""" + if not message: + return False + leak_patterns = [ + "you are a Minecraft", + "command translator", + "Return ONLY JSON", + "SYNTAX RULES", + "RISK GRADIENT", + "permission level", + "CRITICAL RULES", + ] + lower = message.lower() + return sum(1 for p in leak_patterns if p.lower() in lower) >= 2 + + +# --- Remote data pulling --- + +def pull_audit_log(source: str) -> list: + """Pull training audit log from CT 644.""" + if source == "dev": + remote_path = "/var/log/mc_training_audit_dev.jsonl" + else: + remote_path = "/var/log/mc_training_audit.jsonl" + + local_path = RAW_DIR / f"training_audit_{source}_latest.jsonl" + + try: + result = subprocess.run( + f'ssh pve112 "pct pull 644 {remote_path} /tmp/audit_{source}.jsonl" && ' + f'scp pve112:/tmp/audit_{source}.jsonl {local_path}', + shell=True, capture_output=True, text=True, timeout=30 + ) + if result.returncode != 0: + print(f"Warning: could not pull {source} audit log: {result.stderr[:200]}") + return [] + except Exception as e: + print(f"Warning: pull failed for {source}: {e}") + return [] + + entries = [] + with open(local_path) as f: + for line in f: + if line.strip(): + try: + entries.append(json.loads(line)) + except json.JSONDecodeError: + continue + return entries + + +# --- Conversion --- + +def convert_entry(entry: dict, idx: int, source_tag: str) -> dict: + """Convert a training audit entry to dataset schema format.""" + inp = entry.get("input", {}) + out = entry.get("output", {}) + mode = entry.get("mode", "sudo") + player = entry.get("player", "unknown") + + commands = out.get("commands_executed", []) or out.get("commands_generated", []) + message = out.get("message", "") + + example = { + "id": f"audit-{source_tag}-{idx:04d}", + "source": "sudo_log" if mode == "sudo" else "prayer_log", + "category": entry.get("category", "command_gen"), + "input": { + "user_message": inp.get("user_message", ""), + "server_context": inp.get("server_context", {}), + }, + "output": { + "reasoning": f"Live {source_tag} interaction from {player} via {mode} mode.", + "commands": commands, + "message": message if mode in ("god", "god_system") else None, + "safety_flags": [], + }, + "metadata": { + "difficulty": "medium", + "validated": False, + "extracted_from": f"training audit log ({source_tag})", + "scoring_mode": "soft" if mode in ("god", "god_system") else "strict", + }, + } + + return example + + +# --- Main --- + +def main(): + parser = argparse.ArgumentParser(description="Ingest training audit logs") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--source", choices=["dev", "prod", "both"], default="both") + args = parser.parse_args() + + # Load existing messages to deduplicate + existing_messages = set() + if DATASET.exists(): + with open(DATASET) as f: + for line in f: + if line.strip(): + ex = json.loads(line) + existing_messages.add(ex.get("input", {}).get("user_message", "")) + + # Pull data + sources = ["dev", "prod"] if args.source == "both" else [args.source] + all_entries = [] + for src in sources: + entries = pull_audit_log(src) + print(f"Pulled {len(entries)} entries from {src}") + all_entries.extend([(e, src) for e in entries]) + + # Filter and convert + kept = [] + dropped = {"duplicate": 0, "empty": 0, "lang_mismatch": 0, "prompt_leak": 0, "feedback": 0} + + for entry, src in all_entries: + # Skip feedback entries (bug_log) + if entry.get("source") == "player_feedback": + dropped["feedback"] += 1 + continue + + inp = entry.get("input", {}) + out = entry.get("output", {}) + user_msg = inp.get("user_message", "") + message = out.get("message", "") + commands = out.get("commands_executed", []) or out.get("commands_generated", []) + + # Skip duplicates + if user_msg in existing_messages: + dropped["duplicate"] += 1 + continue + + # Skip empty (no commands AND no message) + if not commands and not message: + dropped["empty"] += 1 + continue + + # Skip language mismatch (Chinese output for English input, etc.) + if not languages_match(user_msg, message): + dropped["lang_mismatch"] += 1 + continue + + # Skip system prompt leaks + if is_system_prompt_leak(message): + dropped["prompt_leak"] += 1 + continue + + example = convert_entry(entry, len(kept), src) + kept.append(example) + existing_messages.add(user_msg) # prevent within-batch dupes + + print(f"\nResults:") + print(f" Total entries: {len(all_entries)}") + print(f" Kept: {len(kept)}") + print(f" Dropped:") + for reason, count in sorted(dropped.items()): + if count > 0: + print(f" {reason:20} {count}") + + if args.dry_run: + print(f"\n[DRY RUN] Would append {len(kept)} examples to {DATASET}") + for ex in kept[:5]: + msg = ex["input"]["user_message"][:60] + cmds = len(ex["output"]["commands"]) + print(f" {ex['id']}: {msg} [{cmds} cmds]") + return + + if not kept: + print("Nothing to add.") + return + + # Append to dataset + with open(DATASET, "a") as f: + for ex in kept: + f.write(json.dumps(ex, ensure_ascii=False) + "\n") + + print(f"\nAppended {len(kept)} examples to {DATASET}") + + # Validate + result = subprocess.run( + [sys.executable, str(ROOT / "data" / "validate_dataset.py"), str(DATASET)], + capture_output=True, text=True + ) + print(result.stdout) + if result.returncode != 0: + print("WARNING: Validation failed!") + print(result.stdout) + + +if __name__ == "__main__": + main()