#!/usr/bin/env python3 """ Scrape Minecraft server logs from GitHub to extract command examples for training. Searches public repos for server log files, filters for 1.20.5+ versions, extracts player/console/RCON commands, and converts them to the project's JSONL training schema. Usage: python3 data/scrape_server_logs.py python3 data/scrape_server_logs.py --dry-run --max-repos 10 python3 data/scrape_server_logs.py --output-dir /tmp/scraped """ import argparse import json import os import re import subprocess import sys import time import uuid from pathlib import Path from typing import Optional # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- SEARCH_QUERIES = [ 'minecraft server.log "issued server command"', 'minecraft latest.log "issued server command"', 'minecraft latest.log rcon', 'paper server "executed command"', 'worldedit "//set"', 'minecraft "gamemode" "give" server command', ] # Versions we accept: 1.20.5, 1.20.6, 1.21, 1.21.x MIN_VERSION = (1, 20, 5) VERSION_PATTERNS = [ # "Starting minecraft server version 1.21.1" re.compile(r"Starting minecraft server version\s+([\d.]+)", re.IGNORECASE), # "This server is running Paper version 1.21.1-..." re.compile(r"This server is running\s+\S+\s+version\s+([\d.]+)", re.IGNORECASE), # "Paper version git-Paper-123 (MC: 1.21.1)" re.compile(r"\(MC:\s*([\d.]+)\)", re.IGNORECASE), # "Server version: 1.21.1" re.compile(r"Server version[:\s]+([\d.]+)", re.IGNORECASE), # Spigot / Purpur / Folia variants re.compile(r"Implementing API version\s+([\d.]+)", re.IGNORECASE), ] # Command extraction patterns # Group 1 = player, Group 2 = command (with leading /) CMD_ISSUED = re.compile( r"(\w{3,16})\s+issued server command:\s+(/.+)", re.IGNORECASE ) # Alternate formats: [player: issued server command: /cmd] or issued ... CMD_ISSUED_ALT = re.compile( r"[<\[](\w{3,16})[>:\]]\s+issued server command:\s+(/.+)", re.IGNORECASE ) # RCON: "RCON executing: /command" or "Rcon Executing console command: /cmd" RCON_CMD = re.compile( r"RCON\s+(?:executing|Executing)[^/]*(/.+)", re.IGNORECASE ) # [Server] /command (console) CONSOLE_CMD = re.compile( r"\[Server\]\s+(/.+)", re.IGNORECASE ) # WorldEdit: player used //set stone etc. (via log) WORLDEDIT_CMD = re.compile( r"(\w+)\s+used\s+(//\w+.+)", re.IGNORECASE ) # Generic WorldEdit commands found directly in text WORLDEDIT_INLINE = re.compile( r"(//(?:set|replace|copy|paste|cut|move|stack|undo|redo|fill|walls|" r"outline|sphere|cyl|hcyl|hsphere|drain|fixwater|snow|thaw|green|" r"regen|overlay|naturalize|deform|hollow|center|pos1|pos2|wand|" r"expand|contract|shift|sel|count|distr)\b\S*(?:\s+\S+)*)" ) MAX_FILE_SIZE = 10 * 1024 * 1024 # 10 MB # Mapping command names to natural-language templates for user_message synthesis COMMAND_NL_TEMPLATES = { "give": "give {player} {args}", "gamemode": "switch {player} to {args} mode", "tp": "teleport {player} {args}", "teleport": "teleport {player} {args}", "time": "set the time to {args}", "weather": "change the weather to {args}", "effect": "apply effect {args} to {player}", "kill": "kill {target}", "summon": "summon {args}", "setblock": "place {args}", "fill": "fill area with {args}", "enchant": "enchant {args}", "clear": "clear {player}'s inventory", "xp": "give xp {args}", "experience": "give experience {args}", "ban": "ban {player}", "kick": "kick {player}", "op": "make {player} an operator", "deop": "remove operator from {player}", "msg": "message {args}", "tell": "message {args}", "say": "announce {args}", "difficulty": "set difficulty to {args}", "spawnpoint": "set spawn point {args}", "setworldspawn": "set world spawn {args}", "gamerule": "set gamerule {args}", "particle": "create particle {args}", "playsound": "play sound {args}", "title": "show title {args}", "scoreboard": "scoreboard {args}", "execute": "execute {args}", "data": "modify data {args}", "attribute": "modify attribute {args}", } # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def run_gh(args: list[str], timeout: int = 30) -> Optional[str]: """Run a gh CLI command and return stdout, or None on error.""" cmd = ["gh"] + args try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) if result.returncode != 0: stderr = result.stderr.strip() # Rate limit hit if "rate limit" in stderr.lower() or "403" in stderr: print(f" [rate-limit] Sleeping 60s ...") time.sleep(60) result = subprocess.run( cmd, capture_output=True, text=True, timeout=timeout ) if result.returncode != 0: return None else: return None return result.stdout except subprocess.TimeoutExpired: print(f" [timeout] gh command timed out: {' '.join(cmd[:6])}") return None except Exception as e: print(f" [error] gh command failed: {e}") return None def parse_version(version_str: str) -> Optional[tuple]: """Parse '1.21.1' into (1, 21, 1). Returns None on failure.""" parts = version_str.strip().split(".") try: nums = tuple(int(p) for p in parts) # Pad to 3 components while len(nums) < 3: nums = nums + (0,) return nums[:3] except (ValueError, IndexError): return None def version_acceptable(version_str: str) -> bool: """Return True if version >= 1.20.5.""" v = parse_version(version_str) if v is None: return False return v >= MIN_VERSION def detect_version(text: str) -> Optional[str]: """Try to detect the Minecraft server version from log text.""" for pat in VERSION_PATTERNS: m = pat.search(text) if m: return m.group(1) return None def extract_commands(text: str) -> list[dict]: """ Extract commands from log text. Returns list of dicts: {player, command, source_type, context_line} """ results = [] lines = text.splitlines() for i, line in enumerate(lines): next_line = lines[i + 1].strip() if i + 1 < len(lines) else "" # Player issued server command m = CMD_ISSUED.search(line) if not m: m = CMD_ISSUED_ALT.search(line) if m: results.append({ "player": m.group(1), "command": m.group(2).strip(), "source_type": "player_command", "response": next_line if next_line else None, }) continue # RCON m = RCON_CMD.search(line) if m: results.append({ "player": "RCON", "command": m.group(1).strip(), "source_type": "rcon", "response": next_line if next_line else None, }) continue # Console m = CONSOLE_CMD.search(line) if m: results.append({ "player": "Console", "command": m.group(1).strip(), "source_type": "console", "response": next_line if next_line else None, }) continue # WorldEdit (player used //cmd) m = WORLDEDIT_CMD.search(line) if m: results.append({ "player": m.group(1), "command": m.group(2).strip(), "source_type": "worldedit", "response": None, }) continue return results def synthesize_user_message(command: str, player: str) -> str: """ Convert a raw command like '/give player diamond_sword 1' into a natural language request like 'give me a diamond sword'. """ # Strip leading / cmd = command.lstrip("/") parts = cmd.split(None, 1) if not parts: return cmd cmd_name = parts[0].lower() args_str = parts[1] if len(parts) > 1 else "" # Try to produce something reasonable if cmd_name == "give" and args_str: # /give player item count give_parts = args_str.split() if len(give_parts) >= 2: item = give_parts[1].replace("minecraft:", "").replace("_", " ") count = give_parts[2] if len(give_parts) > 2 else "1" if count == "1": return f"give me a {item}" return f"give me {count} {item}" if cmd_name == "gamemode" and args_str: gm_parts = args_str.split() mode = gm_parts[0] if gm_parts else args_str return f"put me in {mode} mode" if cmd_name in ("tp", "teleport") and args_str: return f"teleport to {args_str}" if cmd_name == "time" and args_str: return f"set the time to {args_str.replace('set ', '')}" if cmd_name == "weather" and args_str: return f"make the weather {args_str}" if cmd_name == "effect" and args_str: effect_parts = args_str.split() # effect give player effect_name ... if len(effect_parts) >= 3 and effect_parts[0] == "give": eff_name = effect_parts[2].replace("minecraft:", "").replace("_", " ") return f"give me {eff_name} effect" return f"apply effect {args_str}" if cmd_name == "kill": return f"kill {args_str if args_str else 'me'}" if cmd_name == "summon" and args_str: entity = args_str.split()[0].replace("minecraft:", "").replace("_", " ") return f"summon a {entity}" if cmd_name in ("setblock", "fill") and args_str: # Try to find the block name block_match = re.search(r"minecraft:(\w+)", args_str) if block_match: block = block_match.group(1).replace("_", " ") if cmd_name == "fill": return f"fill the area with {block}" return f"place a {block} block" return f"{cmd_name} {args_str}" if cmd_name == "difficulty" and args_str: return f"set difficulty to {args_str}" if cmd_name == "gamerule" and args_str: return f"set gamerule {args_str}" if cmd_name.startswith("/"): # WorldEdit command we_cmd = cmd_name.lstrip("/") return f"worldedit {we_cmd} {args_str}".strip() # Fallback: just use the command as-is return cmd def command_to_training_example( cmd_info: dict, version: str, repo_name: str, existing_commands: set, ) -> Optional[dict]: """Convert an extracted command into a training example dict.""" raw_cmd = cmd_info["command"].lstrip("/") player = cmd_info["player"] # Skip empty or very short commands if len(raw_cmd) < 2: return None # Skip plugin-specific commands that aren't vanilla/paper first_word = raw_cmd.split()[0].lower() if raw_cmd.split() else "" # Allow known vanilla + worldedit commands, skip obscure plugin ones # (We keep a generous allowlist rather than a blocklist) SKIP_PREFIXES = { "pl", "plugins", "ver", "version", "about", "help", "?", "tps", "spark", "perm", "lp", "luckperms", "essentials", "eco", "economy", "vault", "cmi", "nucleus", "chat", "party", "guild", "clan", "faction", "f", "home", "sethome", "warp", "setwarp", "spawn", "hub", "lobby", "menu", "shop", "ah", "auction", "buy", "sell", "pay", "bal", "balance", "money", "trade", "market", "store", "rank", "rankup", "prestige", "level", "stats", "vote", "reward", "crate", "key", "kit", "fly", "god", "vanish", "nick", "nickname", "dynmap", "map", "bluemap", "worldguard", "wg", "region", "towny", "town", "nation", "plot", "resident", "mcmmo", "mining", "excavation", "repair", "jobs", "quests", "quest", "discord", "link", } if first_word in SKIP_PREFIXES: return None # Deduplicate if raw_cmd in existing_commands: return None user_msg = synthesize_user_message(cmd_info["command"], player) example = { "id": f"scraped-{uuid.uuid4().hex[:12]}", "source": "scraped_github", "category": "command_gen", "input": { "user_message": user_msg, "server_context": { "server_type": "paper", "version": version, }, }, "output": { "reasoning": f"Extracted from GitHub repo {repo_name} server log.", "commands": [raw_cmd], "safety_flags": [], }, "metadata": { "difficulty": "easy", "validated": False, "extracted_from": f"github:{repo_name}", "risk_level": 3, }, } return example def load_existing_commands(seed_path: str) -> set: """Load commands from existing dataset for deduplication.""" commands = set() if not os.path.exists(seed_path): return commands with open(seed_path, "r") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) for cmd in obj.get("output", {}).get("commands", []): commands.add(cmd) except json.JSONDecodeError: continue return commands # --------------------------------------------------------------------------- # GitHub search # --------------------------------------------------------------------------- def search_github_code(query: str, max_results: int = 100) -> list[dict]: """ Search GitHub code via gh api and return a list of file info dicts. Each dict has: repo, path, html_url, sha. """ results = [] per_page = min(max_results, 30) # GitHub caps at 30 for code search page = 1 while len(results) < max_results: # gh api uses the REST endpoint api_path = ( f"/search/code?q={query}&per_page={per_page}&page={page}" ) raw = run_gh(["api", api_path], timeout=30) if raw is None: break try: data = json.loads(raw) except json.JSONDecodeError: break items = data.get("items", []) if not items: break for item in items: repo = item.get("repository", {}).get("full_name", "") path = item.get("path", "") sha = item.get("sha", "") html_url = item.get("html_url", "") results.append({ "repo": repo, "path": path, "sha": sha, "html_url": html_url, }) if len(items) < per_page: break page += 1 # Rate limit politeness time.sleep(2) if page > 5: # Safety cap: 5 pages max break return results[:max_results] def download_file_content(repo: str, path: str) -> Optional[str]: """Download a file from a GitHub repo. Returns text content or None.""" # First check size via the API api_path = f"/repos/{repo}/contents/{path}" raw = run_gh(["api", api_path], timeout=30) if raw is None: return None try: data = json.loads(raw) except json.JSONDecodeError: return None size = data.get("size", 0) if size > MAX_FILE_SIZE: print(f" [skip] {repo}/{path}: too large ({size / 1024 / 1024:.1f} MB)") return None download_url = data.get("download_url") if not download_url: # Try to get via git blob sha = data.get("sha", "") if sha: blob_raw = run_gh( ["api", f"/repos/{repo}/git/blobs/{sha}", "-H", "Accept: application/vnd.github.raw"], timeout=30, ) return blob_raw return None # Download via curl (gh doesn't handle raw downloads well) try: result = subprocess.run( ["curl", "-sL", "--max-filesize", str(MAX_FILE_SIZE), download_url], capture_output=True, text=True, timeout=30, ) if result.returncode == 0: return result.stdout except Exception: pass return None # --------------------------------------------------------------------------- # Main pipeline # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Scrape Minecraft server logs from GitHub for training data." ) parser.add_argument( "--dry-run", action="store_true", help="Search but don't download files.", ) parser.add_argument( "--max-repos", type=int, default=50, help="Maximum number of repos/files to check (default: 50).", ) parser.add_argument( "--output-dir", type=str, default=None, help="Output directory (default: data/raw/).", ) args = parser.parse_args() # Resolve paths script_dir = Path(__file__).resolve().parent project_root = script_dir.parent if script_dir.name == "data" else script_dir if args.output_dir: output_dir = Path(args.output_dir) else: output_dir = project_root / "data" / "raw" output_dir.mkdir(parents=True, exist_ok=True) raw_output = output_dir / "scraped_github.jsonl" training_output = output_dir / "scraped_training.jsonl" seed_path = project_root / "data" / "processed" / "seed_dataset.jsonl" # Load existing commands for dedup existing_commands = load_existing_commands(str(seed_path)) print(f"Loaded {len(existing_commands)} existing commands for dedup.") # ------------------------------------------------------------------ # Phase 1: Search GitHub # ------------------------------------------------------------------ print("\n=== Phase 1: Searching GitHub ===") all_files: dict[str, dict] = {} # keyed by repo/path to dedup for query in SEARCH_QUERIES: print(f"\n Query: {query}") results = search_github_code(query, max_results=100) print(f" Found {len(results)} results.") for r in results: key = f"{r['repo']}/{r['path']}" if key not in all_files: all_files[key] = r time.sleep(2) # Politeness between queries print(f"\nTotal unique files found: {len(all_files)}") if args.dry_run: print("\n[DRY RUN] Listing files that would be downloaded:") for key, info in list(all_files.items())[:args.max_repos]: print(f" {info['repo']}/{info['path']}") print(f"\nWould check up to {min(len(all_files), args.max_repos)} files.") return # ------------------------------------------------------------------ # Phase 2: Download and process files # ------------------------------------------------------------------ print("\n=== Phase 2: Downloading and processing ===") stats = { "files_checked": 0, "files_with_version": 0, "files_accepted": 0, "files_rejected_version": 0, "files_no_version": 0, "total_commands": 0, "training_examples": 0, "version_distribution": {}, } raw_commands: list[dict] = [] training_examples: list[dict] = [] seen_commands: set = set(existing_commands) file_list = list(all_files.values())[:args.max_repos] for i, file_info in enumerate(file_list): repo = file_info["repo"] path = file_info["path"] print(f"\n[{i + 1}/{len(file_list)}] {repo}/{path}") stats["files_checked"] += 1 content = download_file_content(repo, path) if content is None: print(" [skip] Could not download.") time.sleep(2) continue # Detect version version = detect_version(content) if version: stats["files_with_version"] += 1 stats["version_distribution"][version] = ( stats["version_distribution"].get(version, 0) + 1 ) if not version_acceptable(version): print(f" [reject] Version {version} is too old (need >= 1.20.5).") stats["files_rejected_version"] += 1 time.sleep(2) continue print(f" [ok] Version {version}") stats["files_accepted"] += 1 else: print(" [skip] No version detected in log.") stats["files_no_version"] += 1 time.sleep(2) continue # Extract commands commands = extract_commands(content) print(f" Extracted {len(commands)} commands.") stats["total_commands"] += len(commands) for cmd_info in commands: # Save raw raw_entry = { "repo": repo, "path": path, "version": version, **cmd_info, } raw_commands.append(raw_entry) # Convert to training example example = command_to_training_example( cmd_info, version, repo, seen_commands ) if example: training_examples.append(example) # Track for dedup for cmd in example["output"]["commands"]: seen_commands.add(cmd) stats["training_examples"] += 1 time.sleep(2) # Politeness # ------------------------------------------------------------------ # Phase 3: Save results # ------------------------------------------------------------------ print("\n=== Phase 3: Saving results ===") with open(raw_output, "w") as f: for entry in raw_commands: f.write(json.dumps(entry) + "\n") print(f" Raw commands: {raw_output} ({len(raw_commands)} entries)") with open(training_output, "w") as f: for entry in training_examples: f.write(json.dumps(entry) + "\n") print(f" Training examples: {training_output} ({len(training_examples)} entries)") # ------------------------------------------------------------------ # Summary # ------------------------------------------------------------------ print("\n" + "=" * 60) print("SUMMARY") print("=" * 60) print(f" Files found (unique): {len(all_files)}") print(f" Files checked: {stats['files_checked']}") print(f" Files with version detected: {stats['files_with_version']}") print(f" Files accepted (>= 1.20.5): {stats['files_accepted']}") print(f" Files rejected (old version): {stats['files_rejected_version']}") print(f" Files skipped (no version): {stats['files_no_version']}") print(f" Total commands extracted: {stats['total_commands']}") print(f" Training examples generated: {stats['training_examples']}") print(f"\n Version distribution:") for ver, count in sorted(stats["version_distribution"].items()): accepted = "ok" if version_acceptable(ver) else "REJECTED" print(f" {ver}: {count} files [{accepted}]") print("=" * 60) if __name__ == "__main__": main()