Prompt pipeline: 1660 generates, bigger GPUs process via Mortdecai

Architecture:
- 1660 Super (qwen3.5:0.8b) generates diverse edge-case prompts
- 2080 Ti / RTX 4000 / 3090 Ti process through Mortdecai + RCON validation
- File-based queue with locking for multi-GPU coordination
- 10 prompt categories targeting known weaknesses

Categories: fill_syntax, enchantments, execute_chains, entity_targeting,
gamerules_timed, memory_commands, creative_prayers, edge_items,
multicommand, natural_language

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Mortdecai
2026-03-21 00:08:48 -04:00
parent 3c1cbfce39
commit 434589d098
+330
View File
@@ -0,0 +1,330 @@
#!/usr/bin/env python3
"""
Prompt Pipeline — 1660 generates prompts, bigger GPUs process them through Mortdecai.
Architecture:
1660 Super (qwen3.5:0.8b) → generates diverse edge-case prompts
→ writes to shared queue file
2080 Ti / RTX 4000 / 3090 Ti (mortdecai) → reads queue, processes, validates via RCON
Usage:
# Terminal 1: prompt generator (runs on 1660)
python3 prompt_pipeline.py --mode generate --ollama-url http://192.168.0.235:11434
# Terminal 2+: processors (one per GPU)
python3 prompt_pipeline.py --mode process --ollama-url http://192.168.0.141:11434
python3 prompt_pipeline.py --mode process --ollama-url http://192.168.0.179:11434
# Or run everything:
python3 prompt_pipeline.py --mode all
"""
import argparse
import json
import os
import re
import random
import sys
import time
import threading
import fcntl
from pathlib import Path
import requests
ROOT = Path(__file__).resolve().parent.parent.parent
sys.path.insert(0, str(ROOT))
from agent.tools.persistent_rcon import get_rcon
QUEUE_FILE = ROOT / "data" / "processed" / "prompt_queue.jsonl"
OUTPUT_FILE = ROOT / "data" / "processed" / "pipeline_output.jsonl"
LOCK_FILE = str(QUEUE_FILE) + ".lock"
# Categories the model needs more training on (based on validator hit rates and bake-off failures)
GENERATION_PROMPTS = {
"fill_syntax": """Generate 5 Minecraft player chat messages that involve filling areas with blocks.
Include: walls, floors, replacing blocks, clearing areas, building structures.
The AI often gets fill syntax wrong — adding a trailing count number or wrong coordinates.
Every message must start with "sudo ". Return ONLY a JSON array of strings.""",
"enchantments": """Generate 5 Minecraft player chat messages requesting enchanted items.
Include: multiple enchantments, max level enchants, mutually exclusive combinations, items that cant be enchanted.
The AI sometimes uses old NBT syntax instead of 1.21 component syntax.
Every message must start with "sudo " or "pray ". Return ONLY a JSON array of strings.""",
"execute_chains": """Generate 5 Minecraft player chat messages requiring execute command chains.
Include: execute as/at/if/positioned/in, targeting by gamemode/team, dimension switching, conditional blocks.
Every message must start with "sudo ". Return ONLY a JSON array of strings.""",
"entity_targeting": """Generate 5 Minecraft player chat messages about killing or targeting specific entities.
Include: "kill THE zombie" vs "kill ALL zombies", distance-based targeting, entity types, nearby vs far.
Every message must start with "sudo ". Return ONLY a JSON array of strings.""",
"gamerules_timed": """Generate 5 Minecraft player chat messages requesting temporary gamerule changes.
Include: specific durations like "for 5 minutes", "for an hour", "briefly", "permanently".
The AI needs to learn when to set revert timers vs permanent changes.
Every message must start with "sudo ". Return ONLY a JSON array of strings.""",
"memory_commands": """Generate 5 Minecraft player chat messages about remembering or recalling locations and facts.
Include: "remember this as home", "tp me home", "what do you know about me", "forget my base".
Every message must start with "sudo " or "pray ". Return ONLY a JSON array of strings.""",
"creative_prayers": """Generate 5 creative Minecraft prayers to an AI God character.
Include: humble requests, greedy demands, blasphemy, roleplay, emotional appeals, sarcasm.
Be creative and diverse. Every message must start with "pray ". Return ONLY a JSON array of strings.""",
"edge_items": """Generate 5 Minecraft player requests for obscure or easily-confused items.
Include: items with color variants (wool, concrete, glass), items that changed names between versions,
items people misspell (steak vs cooked_beef, log vs oak_log, bed vs white_bed).
Every message must start with "sudo ". Return ONLY a JSON array of strings.""",
"multicommand": """Generate 5 Minecraft player requests that need multiple commands to fulfill.
Include: "gear me up for the nether", "build a house and tp me inside", "create a pvp arena",
"prepare me for a boss fight". Complex multi-step requests.
Every message must start with "sudo " or "pray ". Return ONLY a JSON array of strings.""",
"natural_language": """Generate 5 Minecraft chat messages phrased in unusual natural language.
Include: typos, slang, abbreviations, run-on sentences, vague requests, sarcasm, mixed languages.
The AI should still figure out what the player wants.
Every message must start with "sudo " or "pray ". Return ONLY a JSON array of strings.""",
}
def ensure_prefix(msg):
lower = msg.lower().strip()
if lower.startswith("pray ") or lower.startswith("sudo ") or lower.startswith("bug_log"):
return msg
return ("pray " if random.random() < 0.4 else "sudo ") + msg
def generate_prompts(ollama_url, model="qwen3.5:0.8b", count=50):
"""Use a small model to generate diverse prompts."""
prompts = []
categories = list(GENERATION_PROMPTS.keys())
random.shuffle(categories)
for cat in categories:
if len(prompts) >= count:
break
try:
r = requests.post(f"{ollama_url}/api/chat", json={
"model": model,
"messages": [
{"role": "system", "content": "You generate Minecraft chat messages for AI training. Return ONLY a JSON array of strings."},
{"role": "user", "content": GENERATION_PROMPTS[cat]},
],
"stream": False,
"options": {"temperature": 1.1, "num_predict": 400},
}, timeout=60)
content = r.json()["message"]["content"]
content = re.sub(r'<think>[\s\S]*?</think>\s*', '', content)
cleaned = content.replace("```json", "").replace("```", "").strip()
match = re.search(r'\[[\s\S]*\]', cleaned)
if match:
items = json.loads(match.group())
for item in items:
if isinstance(item, str) and item.strip():
prompts.append({
"prompt": ensure_prefix(item.strip()),
"category": cat,
"generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
})
print(f" [{cat}] +{len(items) if match else 0} prompts")
except Exception as e:
print(f" [{cat}] ERROR: {e}")
time.sleep(0.5)
return prompts
def write_queue(prompts):
"""Append prompts to the queue file with file locking."""
os.makedirs(os.path.dirname(QUEUE_FILE), exist_ok=True)
with open(LOCK_FILE, "w") as lf:
fcntl.flock(lf, fcntl.LOCK_EX)
with open(QUEUE_FILE, "a") as f:
for p in prompts:
f.write(json.dumps(p) + "\n")
fcntl.flock(lf, fcntl.LOCK_UN)
print(f" Queued {len(prompts)} prompts")
def read_queue(batch_size=5):
"""Read and remove prompts from the queue."""
if not QUEUE_FILE.exists():
return []
with open(LOCK_FILE, "w") as lf:
fcntl.flock(lf, fcntl.LOCK_EX)
with open(QUEUE_FILE) as f:
lines = f.readlines()
taken = lines[:batch_size]
remaining = lines[batch_size:]
with open(QUEUE_FILE, "w") as f:
f.writelines(remaining)
fcntl.flock(lf, fcntl.LOCK_UN)
return [json.loads(l) for l in taken if l.strip()]
def process_prompt(prompt_data, ollama_url, model, rcon_host, rcon_port, rcon_pass):
"""Process a single prompt through Mortdecai and validate via RCON."""
prompt = prompt_data["prompt"]
category = prompt_data.get("category", "unknown")
mode = "god" if prompt.lower().startswith("pray ") else "sudo"
system = ('You are God in a Minecraft server. Return JSON: {"message": "...", "commands": ["cmd1"], "reasoning": "why"}'
if mode == "god" else
'You are a Minecraft 1.21 command translator. Return JSON: {"commands": ["cmd1"], "reasoning": "why"}\n'
'IMPORTANT: Each command is ONE COMPLETE STRING. Use minecraft: prefix. Enchantments: item[enchantments={name:level}].')
try:
r = requests.post(f"{ollama_url}/api/chat", json={
"model": model,
"messages": [
{"role": "system", "content": "/no_think\n" + system},
{"role": "user", "content": f"Player slingshooter08: {prompt}"},
],
"stream": False, "format": "json",
"options": {"temperature": 0.3, "num_predict": 500},
}, timeout=120)
content = r.json()["message"]["content"]
content = re.sub(r'<think>[\s\S]*?</think>\s*', '', content)
parsed = json.loads(content)
except Exception as e:
return {"prompt": prompt, "category": category, "error": str(e), "success": False}
commands = parsed.get("commands") or []
if not commands or not all(isinstance(c, str) and " " in c for c in commands):
return {"prompt": prompt, "category": category, "commands": commands, "success": False, "reason": "empty_or_split"}
# Validate via RCON
rcon = get_rcon(rcon_host, rcon_port, rcon_pass)
results = []
all_ok = True
for cmd in commands[:6]:
try:
result = rcon.command(cmd)
is_error = "<--[HERE]" in result or "Unknown" in result or "Incorrect" in result
results.append({"cmd": cmd, "result": result[:150], "ok": not is_error})
if is_error:
all_ok = False
except:
results.append({"cmd": cmd, "result": "RCON error", "ok": False})
all_ok = False
return {
"prompt": prompt,
"category": category,
"mode": mode,
"commands": commands,
"rcon_results": results,
"success": all_ok,
"message": parsed.get("message", ""),
"reasoning": parsed.get("reasoning", ""),
"processed_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"ollama_url": ollama_url,
}
def save_result(result):
"""Save a processed result."""
os.makedirs(os.path.dirname(OUTPUT_FILE), exist_ok=True)
with open(OUTPUT_FILE, "a") as f:
f.write(json.dumps(result, ensure_ascii=False) + "\n")
def run_generator(ollama_url, model, interval=120):
"""Continuously generate prompts."""
print(f"Prompt Generator started")
print(f" Model: {model} on {ollama_url}")
print(f" Interval: {interval}s between batches")
while True:
print(f"\nGenerating batch at {time.strftime('%H:%M:%S')}...")
prompts = generate_prompts(ollama_url, model, count=30)
if prompts:
write_queue(prompts)
print(f" Queue size: {sum(1 for _ in open(QUEUE_FILE)) if QUEUE_FILE.exists() else 0}")
time.sleep(interval)
def run_processor(ollama_url, model, rcon_host, rcon_port, rcon_pass):
"""Continuously process prompts from the queue."""
print(f"Processor started")
print(f" Model: {model} on {ollama_url}")
print(f" RCON: {rcon_host}:{rcon_port}")
stats = {"processed": 0, "success": 0, "failed": 0}
while True:
batch = read_queue(3)
if not batch:
time.sleep(5)
continue
for prompt_data in batch:
result = process_prompt(prompt_data, ollama_url, model, rcon_host, rcon_port, rcon_pass)
save_result(result)
stats["processed"] += 1
if result.get("success"):
stats["success"] += 1
print(f" OK [{result['category'][:12]}] {result['prompt'][:50]}")
else:
stats["failed"] += 1
print(f" FAIL [{result['category'][:12]}] {result['prompt'][:50]}")
if stats["processed"] % 20 == 0:
rate = stats["success"] / max(stats["processed"], 1) * 100
print(f" Stats: {stats['processed']} processed, {rate:.0f}% success")
time.sleep(0.2)
def main():
parser = argparse.ArgumentParser(description="Prompt generation pipeline")
parser.add_argument("--mode", choices=["generate", "process", "all"], required=True)
parser.add_argument("--gen-url", default="http://192.168.0.235:11434", help="Generator Ollama URL (small model)")
parser.add_argument("--gen-model", default="qwen3.5:0.8b", help="Generator model")
parser.add_argument("--proc-urls", default="http://192.168.0.141:11434,http://192.168.0.179:11434",
help="Processor Ollama URLs (comma-separated)")
parser.add_argument("--proc-model", default="mortdecai:0.4.0", help="Processor model")
parser.add_argument("--rcon-host", default="192.168.0.244")
parser.add_argument("--rcon-port", type=int, default=25578)
parser.add_argument("--rcon-pass", default="REDACTED_RCON")
parser.add_argument("--interval", type=int, default=120, help="Generator batch interval (seconds)")
args = parser.parse_args()
if args.mode == "generate":
run_generator(args.gen_url, args.gen_model, args.interval)
elif args.mode == "process":
urls = args.proc_urls.split(",")
run_processor(urls[0], args.proc_model, args.rcon_host, args.rcon_port, args.rcon_pass)
elif args.mode == "all":
urls = args.proc_urls.split(",")
# Start generator in background
gen_thread = threading.Thread(target=run_generator,
args=(args.gen_url, args.gen_model, args.interval), daemon=True)
gen_thread.start()
# Start a processor per URL
for url in urls:
proc_thread = threading.Thread(target=run_processor,
args=(url, args.proc_model, args.rcon_host, args.rcon_port, args.rcon_pass),
daemon=True)
proc_thread.start()
print(f"Processor started on {url}")
# Keep main alive
while True:
time.sleep(60)
queue_size = sum(1 for _ in open(QUEUE_FILE)) if QUEUE_FILE.exists() else 0
output_size = sum(1 for _ in open(OUTPUT_FILE)) if OUTPUT_FILE.exists() else 0
print(f"[Pipeline] Queue: {queue_size} | Output: {output_size}")
if __name__ == "__main__":
main()