#!/usr/bin/env python3 """ Dataset merge pipeline for Mortdecai training. Normalizes all dataset formats into the two schemas the trainer expects: 1. `conversations` — [{role, content}, ...] for simple command gen 2. `messages` + `qwen3_text` — multi-turn tool-calling with pre-formatted text Handles deduplication, mix ratios, and outputs a single training-ready JSONL. Usage: # Default merge with recommended ratios python3 merge_datasets.py # Custom ratios (multipliers per source) python3 merge_datasets.py --ratios seed=2.0,tool=1.0,iglu=0.5 # Dry run — show stats without writing python3 merge_datasets.py --dry-run # Include chat app exports python3 merge_datasets.py --include-chat-logs """ import argparse import json import hashlib import random import sys from pathlib import Path from collections import Counter PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent sys.path.insert(0, str(PROJECT_ROOT)) from agent.prompts.system_prompts import get_prompt, SYNTAX_RULES, RISK_GRADIENT # ── Dataset sources ─────────────────────────────────────────────────────────── SOURCES = { "seed": { "path": "data/processed/seed_dataset.jsonl", "format": "seed", "default_ratio": 2.0, # Oversample — keep seed dominant "description": "Core command gen with pos/neg pairs", }, "tool": { "path": "data/processed/tool_training.jsonl", "format": "tool_messages", "default_ratio": 1.0, "description": "Multi-turn tool-calling examples", }, "tool_v05": { "path": "data/processed/tool_training_v05.jsonl", "format": "tool_messages", "default_ratio": 1.5, # High quality, oversample "description": "0.5.0 quality tool examples", }, "iglu": { "path": "data/raw/iglu_build_training.jsonl", "format": "tool_messages", "default_ratio": 0.8, "description": "IGLU building dataset", }, "plugin": { "path": "data/raw/plugin_training.jsonl", "format": "tool_messages", "default_ratio": 1.5, "description": "Plugin command examples", }, "exploration": { "path": "data/processed/filtered_exploration.jsonl", "format": "exploration", "default_ratio": 1.0, "description": "Wiki-grounded exploration", }, "self_play": { "path": "data/processed/self_play.jsonl", "format": "self_play", "default_ratio": 0.6, # Large set, don't let it dominate "description": "Self-play generations", }, "audit": { "path": "data/processed/filtered_audit.jsonl", "format": "audit", "default_ratio": 0.5, # Large set, needs dilution "description": "Filtered audit log data", }, "distilled": { "path": "data/processed/distilled.jsonl", "format": "seed", "default_ratio": 1.5, # Gold standard from Claude "description": "Claude-distilled examples", }, "chat_logs": { "path": "data/chat_logs/training_export.jsonl", "format": "audit", "default_ratio": 2.0, # Hand-curated via chat app "description": "Chat app training exports", "optional": True, }, } # Also include all raw training files RAW_TRAINING_FILES = [ "data/raw/advanced_commands_training.jsonl", "data/raw/biome_dimension_training.jsonl", "data/raw/chaos_event_training.jsonl", "data/raw/chaos_gaps_training.jsonl", "data/raw/command_reference_training.jsonl", "data/raw/cosmetic_xp_training.jsonl", "data/raw/dangerous_effects_training.jsonl", "data/raw/death_environment_training.jsonl", "data/raw/distance_projectile_training.jsonl", "data/raw/distance_scale_training.jsonl", "data/raw/enchant_order_errors.jsonl", "data/raw/enchantment_training.jsonl", "data/raw/entity_mob_training.jsonl", "data/raw/entity_targeting_training.jsonl", "data/raw/error_correction_training.jsonl", "data/raw/event_trigger_training.jsonl", "data/raw/execute_chain_training.jsonl", "data/raw/fall_safety_training.jsonl", "data/raw/gamerule_training.jsonl", "data/raw/kill_radius_training.jsonl", "data/raw/memory_training.jsonl", "data/raw/multiplayer_training.jsonl", "data/raw/multistep_training.jsonl", "data/raw/paper_training.jsonl", "data/raw/prod_pattern_fixes.jsonl", "data/raw/quantity_training.jsonl", "data/raw/recipe_training.jsonl", "data/raw/redstone_training.jsonl", "data/raw/revert_and_drops_training.jsonl", "data/raw/revert_format_training.jsonl", "data/raw/risk_hierarchy_training.jsonl", "data/raw/script_tool_training.jsonl", "data/raw/suffocation_training.jsonl", "data/raw/worldedit_training.jsonl", ] # ── Format converters ───────────────────────────────────────────────────────── SUDO_SYSTEM = get_prompt("sudo") GOD_SYSTEM = get_prompt("god") def _seed_to_conversations(record: dict) -> dict: """Convert seed_dataset format to conversations.""" inp = record.get("input", {}) out = record.get("output", {}) user_msg = inp.get("user_message", "") commands = out.get("commands", []) reasoning = out.get("reasoning", "") # Detect mode from prefix if user_msg.lower().startswith("pray "): system = GOD_SYSTEM mode = "god" else: system = SUDO_SYSTEM mode = "sudo" # Build assistant response JSON response = {"commands": commands, "reasoning": reasoning} if mode == "god": response["message"] = out.get("message", "") return { "conversations": [ {"role": "system", "content": "/no_think\n" + system}, {"role": "user", "content": user_msg}, {"role": "assistant", "content": json.dumps(response)}, ] } def _audit_to_conversations(record: dict) -> dict: """Convert audit log format to conversations.""" inp = record.get("input", {}) out = record.get("output", {}) mode = record.get("mode", "sudo") user_msg = inp.get("user_message", "") commands = out.get("commands_generated", []) or out.get("commands", []) message = out.get("message", "") system = GOD_SYSTEM if mode == "god" else SUDO_SYSTEM response = {"commands": commands} if message: response["message"] = message return { "conversations": [ {"role": "system", "content": "/no_think\n" + system}, {"role": "user", "content": user_msg}, {"role": "assistant", "content": json.dumps(response)}, ] } def _self_play_to_conversations(record: dict) -> dict: """Convert self_play format to conversations.""" inp = record.get("input", {}) out = record.get("output", {}) user_msg = inp.get("user_message", "") commands = out.get("commands", []) reasoning = out.get("reasoning", "") message = out.get("message", record.get("message", "")) if user_msg.lower().startswith("pray "): system = GOD_SYSTEM else: system = SUDO_SYSTEM response = {"commands": commands, "reasoning": reasoning} if message: response["message"] = message return { "conversations": [ {"role": "system", "content": "/no_think\n" + system}, {"role": "user", "content": user_msg}, {"role": "assistant", "content": json.dumps(response)}, ] } def _exploration_to_conversations(record: dict) -> dict: """Convert exploration format to conversations.""" inp = record.get("input", {}) out = record.get("output", {}) user_msg = inp.get("user_message", "") if isinstance(inp, dict) else str(inp) commands = out.get("commands", []) reasoning = out.get("reasoning", "") response = {"commands": commands, "reasoning": reasoning} return { "conversations": [ {"role": "system", "content": "/no_think\n" + SUDO_SYSTEM}, {"role": "user", "content": user_msg}, {"role": "assistant", "content": json.dumps(response)}, ] } def _tool_messages_passthrough(record: dict) -> dict: """Tool training already has messages — pass through or use qwen3_text.""" if "qwen3_text" in record: return {"text": record["qwen3_text"]} if "messages" in record: return {"conversations": record["messages"]} return None def _raw_training_to_conversations(record: dict) -> dict: """Convert raw training files (same as seed format).""" return _seed_to_conversations(record) CONVERTERS = { "seed": _seed_to_conversations, "tool_messages": _tool_messages_passthrough, "audit": _audit_to_conversations, "self_play": _self_play_to_conversations, "exploration": _exploration_to_conversations, "raw_training": _raw_training_to_conversations, } # ── Pipeline ────────────────────────────────────────────────────────────────── def dedup_key(record: dict) -> str: """Generate a dedup key from the training content.""" if "text" in record: content = record["text"][:500] elif "conversations" in record: # Use user message + first 200 chars of assistant response user = "" asst = "" for msg in record["conversations"]: if msg["role"] == "user": user = msg["content"][:200] elif msg["role"] == "assistant" and not asst: asst = msg["content"][:200] content = user + "|" + asst else: content = json.dumps(record)[:500] return hashlib.md5(content.encode()).hexdigest() def load_and_convert(source_name: str, meta: dict, ratio: float) -> list: """Load a source file, convert to training format, apply ratio.""" path = PROJECT_ROOT / meta["path"] if not path.exists(): if meta.get("optional"): return [] print(f" WARNING: {path} not found, skipping {source_name}") return [] converter = CONVERTERS[meta["format"]] records = [] with open(path) as f: for line in f: line = line.strip() if not line: continue try: raw = json.loads(line) converted = converter(raw) if converted: records.append(converted) except (json.JSONDecodeError, KeyError, TypeError) as e: continue # Apply ratio (oversample or downsample) if ratio > 1.0: # Oversample: duplicate records full_copies = int(ratio) partial = ratio - full_copies oversampled = records * full_copies if partial > 0: extra = random.sample(records, int(len(records) * partial)) oversampled.extend(extra) records = oversampled elif ratio < 1.0: # Downsample k = max(1, int(len(records) * ratio)) records = random.sample(records, k) return records def main(): parser = argparse.ArgumentParser(description="Merge datasets for Mortdecai training") parser.add_argument("--output", type=Path, default=PROJECT_ROOT / "data" / "processed" / "merged_training_v06.jsonl") parser.add_argument("--ratios", default="", help="Override ratios: seed=2.0,tool=1.0,iglu=0.5") parser.add_argument("--dry-run", action="store_true", help="Print stats without writing output") parser.add_argument("--include-chat-logs", action="store_true", help="Include chat app training exports") parser.add_argument("--include-raw", action="store_true", default=True, help="Include raw training files (default: true)") parser.add_argument("--seed", type=int, default=42, help="Random seed for reproducibility") args = parser.parse_args() random.seed(args.seed) # Parse ratio overrides ratio_overrides = {} if args.ratios: for pair in args.ratios.split(","): name, val = pair.split("=") ratio_overrides[name.strip()] = float(val.strip()) # Filter sources active_sources = dict(SOURCES) if not args.include_chat_logs: active_sources.pop("chat_logs", None) print("Mortdecai Dataset Merge Pipeline") print("=" * 60) print() all_records = [] stats = {} # Load named sources for name, meta in active_sources.items(): ratio = ratio_overrides.get(name, meta["default_ratio"]) records = load_and_convert(name, meta, ratio) raw_count = 0 path = PROJECT_ROOT / meta["path"] if path.exists(): with open(path) as f: raw_count = sum(1 for _ in f) stats[name] = {"raw": raw_count, "after_ratio": len(records), "ratio": ratio} all_records.extend(records) print(f" {name:<20s} {raw_count:>6} raw x{ratio:.1f} = {len(records):>7} ({meta['description']})") # Load raw training files if args.include_raw: raw_total = 0 for filepath in RAW_TRAINING_FILES: path = PROJECT_ROOT / filepath if not path.exists(): continue converter = CONVERTERS["raw_training"] count = 0 with open(path) as f: for line in f: try: raw = json.loads(line.strip()) converted = converter(raw) if converted: all_records.append(converted) count += 1 except: continue raw_total += count stats["raw_files"] = {"raw": raw_total, "after_ratio": raw_total, "ratio": 1.0} print(f" {'raw_files':<20s} {raw_total:>6} raw x1.0 = {raw_total:>7} ({len(RAW_TRAINING_FILES)} files)") print() print(f" Total before dedup: {len(all_records)}") # Deduplicate seen = set() deduped = [] for r in all_records: key = dedup_key(r) if key not in seen: seen.add(key) deduped.append(r) dupes_removed = len(all_records) - len(deduped) print(f" Duplicates removed: {dupes_removed}") print(f" Total after dedup: {len(deduped)}") # Count format split text_count = sum(1 for r in deduped if "text" in r) conv_count = sum(1 for r in deduped if "conversations" in r) print(f" Format: {conv_count} conversations, {text_count} pre-formatted text") # Shuffle random.shuffle(deduped) if args.dry_run: print("\n [DRY RUN] No output written.") return # Write args.output.parent.mkdir(parents=True, exist_ok=True) with open(args.output, "w") as f: for r in deduped: f.write(json.dumps(r, ensure_ascii=False) + "\n") print(f"\n Wrote {len(deduped)} examples to {args.output}") print(f" File size: {args.output.stat().st_size / 1e6:.1f} MB") if __name__ == "__main__": main()