Files
Mortdecai/data/ingest_audit.py
Seth 13debc8a59 Add audit log ingestion pipeline with language/leak filtering
data/ingest_audit.py:
- Pulls training audit logs from CT 644 (dev + prod)
- Filters: language mismatch (Chinese output for English input), system
  prompt leaks, empty responses, duplicates
- Keeps multilingual examples where input/output languages match
- Converts to dataset schema, appends to seed_dataset.jsonl
- --dry-run to preview, --source dev/prod/both

Tested: 237 entries → 112 kept (16 lang mismatch, 10 prompt leak, 86 dupe, 13 empty dropped)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-18 17:58:52 -04:00

273 lines
8.7 KiB
Python

#!/usr/bin/env python3
"""
ingest_audit.py — Pull training audit logs from CT 644, filter, and merge into dataset.
Filters:
- Drops examples where output language doesn't match input language
- Drops empty responses (no commands AND no message)
- Drops duplicate user_messages already in the dataset
- Keeps multilingual examples where input/output languages match
- Tags all as validated=false, needs human review
Usage:
python3 data/ingest_audit.py # pull, filter, merge
python3 data/ingest_audit.py --dry-run # show what would be added
python3 data/ingest_audit.py --source dev # only dev server
python3 data/ingest_audit.py --source prod # only prod server
"""
import argparse
import json
import re
import subprocess
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
DATASET = ROOT / "data" / "processed" / "seed_dataset.jsonl"
RAW_DIR = ROOT / "data" / "raw"
# --- Language detection (simple, no dependencies) ---
def has_cjk(text: str) -> bool:
"""Check if text contains CJK (Chinese/Japanese/Korean) characters."""
return bool(re.search(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff\uac00-\ud7af]', text))
def has_cyrillic(text: str) -> bool:
return bool(re.search(r'[\u0400-\u04ff]', text))
def has_arabic(text: str) -> bool:
return bool(re.search(r'[\u0600-\u06ff]', text))
def detect_script(text: str) -> str:
"""Detect the dominant script of a text. Returns 'latin', 'cjk', 'cyrillic', 'arabic', or 'mixed'."""
if not text or len(text.strip()) < 3:
return "latin"
# Count character types
latin = len(re.findall(r'[a-zA-Z]', text))
cjk = len(re.findall(r'[\u4e00-\u9fff\u3040-\u309f\u30a0-\u30ff\uac00-\ud7af]', text))
cyrillic = len(re.findall(r'[\u0400-\u04ff]', text))
arabic = len(re.findall(r'[\u0600-\u06ff]', text))
total = latin + cjk + cyrillic + arabic
if total == 0:
return "latin"
if cjk / total > 0.3:
return "cjk"
if cyrillic / total > 0.3:
return "cyrillic"
if arabic / total > 0.3:
return "arabic"
return "latin"
def languages_match(input_text: str, output_message: str) -> bool:
"""Check if input and output are in the same script family.
Commands are always English/latin, so we only check the message field."""
if not output_message or len(output_message.strip()) < 3:
return True # no message to check
input_script = detect_script(input_text)
output_script = detect_script(output_message)
return input_script == output_script
def is_system_prompt_leak(message: str) -> bool:
"""Detect if the model leaked its system prompt as the God message."""
if not message:
return False
leak_patterns = [
"you are a Minecraft",
"command translator",
"Return ONLY JSON",
"SYNTAX RULES",
"RISK GRADIENT",
"permission level",
"CRITICAL RULES",
]
lower = message.lower()
return sum(1 for p in leak_patterns if p.lower() in lower) >= 2
# --- Remote data pulling ---
def pull_audit_log(source: str) -> list:
"""Pull training audit log from CT 644."""
if source == "dev":
remote_path = "/var/log/mc_training_audit_dev.jsonl"
else:
remote_path = "/var/log/mc_training_audit.jsonl"
local_path = RAW_DIR / f"training_audit_{source}_latest.jsonl"
try:
result = subprocess.run(
f'ssh pve112 "pct pull 644 {remote_path} /tmp/audit_{source}.jsonl" && '
f'scp pve112:/tmp/audit_{source}.jsonl {local_path}',
shell=True, capture_output=True, text=True, timeout=30
)
if result.returncode != 0:
print(f"Warning: could not pull {source} audit log: {result.stderr[:200]}")
return []
except Exception as e:
print(f"Warning: pull failed for {source}: {e}")
return []
entries = []
with open(local_path) as f:
for line in f:
if line.strip():
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
return entries
# --- Conversion ---
def convert_entry(entry: dict, idx: int, source_tag: str) -> dict:
"""Convert a training audit entry to dataset schema format."""
inp = entry.get("input", {})
out = entry.get("output", {})
mode = entry.get("mode", "sudo")
player = entry.get("player", "unknown")
commands = out.get("commands_executed", []) or out.get("commands_generated", [])
message = out.get("message", "")
example = {
"id": f"audit-{source_tag}-{idx:04d}",
"source": "sudo_log" if mode == "sudo" else "prayer_log",
"category": entry.get("category", "command_gen"),
"input": {
"user_message": inp.get("user_message", ""),
"server_context": inp.get("server_context", {}),
},
"output": {
"reasoning": f"Live {source_tag} interaction from {player} via {mode} mode.",
"commands": commands,
"message": message if mode in ("god", "god_system") else None,
"safety_flags": [],
},
"metadata": {
"difficulty": "medium",
"validated": False,
"extracted_from": f"training audit log ({source_tag})",
"scoring_mode": "soft" if mode in ("god", "god_system") else "strict",
},
}
return example
# --- Main ---
def main():
parser = argparse.ArgumentParser(description="Ingest training audit logs")
parser.add_argument("--dry-run", action="store_true")
parser.add_argument("--source", choices=["dev", "prod", "both"], default="both")
args = parser.parse_args()
# Load existing messages to deduplicate
existing_messages = set()
if DATASET.exists():
with open(DATASET) as f:
for line in f:
if line.strip():
ex = json.loads(line)
existing_messages.add(ex.get("input", {}).get("user_message", ""))
# Pull data
sources = ["dev", "prod"] if args.source == "both" else [args.source]
all_entries = []
for src in sources:
entries = pull_audit_log(src)
print(f"Pulled {len(entries)} entries from {src}")
all_entries.extend([(e, src) for e in entries])
# Filter and convert
kept = []
dropped = {"duplicate": 0, "empty": 0, "lang_mismatch": 0, "prompt_leak": 0, "feedback": 0}
for entry, src in all_entries:
# Skip feedback entries (bug_log)
if entry.get("source") == "player_feedback":
dropped["feedback"] += 1
continue
inp = entry.get("input", {})
out = entry.get("output", {})
user_msg = inp.get("user_message", "")
message = out.get("message", "")
commands = out.get("commands_executed", []) or out.get("commands_generated", [])
# Skip duplicates
if user_msg in existing_messages:
dropped["duplicate"] += 1
continue
# Skip empty (no commands AND no message)
if not commands and not message:
dropped["empty"] += 1
continue
# Skip language mismatch (Chinese output for English input, etc.)
if not languages_match(user_msg, message):
dropped["lang_mismatch"] += 1
continue
# Skip system prompt leaks
if is_system_prompt_leak(message):
dropped["prompt_leak"] += 1
continue
example = convert_entry(entry, len(kept), src)
kept.append(example)
existing_messages.add(user_msg) # prevent within-batch dupes
print(f"\nResults:")
print(f" Total entries: {len(all_entries)}")
print(f" Kept: {len(kept)}")
print(f" Dropped:")
for reason, count in sorted(dropped.items()):
if count > 0:
print(f" {reason:20} {count}")
if args.dry_run:
print(f"\n[DRY RUN] Would append {len(kept)} examples to {DATASET}")
for ex in kept[:5]:
msg = ex["input"]["user_message"][:60]
cmds = len(ex["output"]["commands"])
print(f" {ex['id']}: {msg} [{cmds} cmds]")
return
if not kept:
print("Nothing to add.")
return
# Append to dataset
with open(DATASET, "a") as f:
for ex in kept:
f.write(json.dumps(ex, ensure_ascii=False) + "\n")
print(f"\nAppended {len(kept)} examples to {DATASET}")
# Validate
result = subprocess.run(
[sys.executable, str(ROOT / "data" / "validate_dataset.py"), str(DATASET)],
capture_output=True, text=True
)
print(result.stdout)
if result.returncode != 0:
print("WARNING: Validation failed!")
print(result.stdout)
if __name__ == "__main__":
main()