Files
Mortdecai/agent/tools/event_dispatcher.py
Mortdecai da8f557219 GPU scheduler, 14-tool architecture, plugin deployment, event dispatcher
GPU Scheduler (gpu.sethpc.xyz):
- Live dashboard with 4 GPUs, training monitor, loss sparklines
- Preset-based job scheduler with 3 triggers (time, finish_training, cost)
- Model selection per GPU, pipeline configuration
- Tool self-play and training pipeline types
- Behind Google OAuth, live-refresh without page reload

Tool Architecture (14 tools):
- 3 new tools: world.nearby_entities, memory.read, memory.write
- 7 script.* tools: write, validate, execute, read, list, delete, schedule
- ScriptManager: full mcfunction datapack CRUD with RCON validation
- Training data: 1,430 tool examples (up from 1,159)

Plugin Deployment (paper-ai-25567):
- WorldGuard 7.0.12, CoreProtect CE 23.1, EssentialsX 2.21.2, Vault 1.7.3
- Fresh greenfield world reset
- 104 RCON-validated plugin training examples

Event Dispatcher:
- Watches server log for deaths, joins, advancements, PvP kills
- Configurable trigger probability and cooldowns per event type
- Deployed to dev server, fires god_system prompts on events
- 21 event-response training examples

Training Infrastructure:
- train_lora.py: --save-steps 50, --resume from checkpoint
- run_training.sh: stops Ollama, activates conda, restarts after
- Passwordless sudo for ollama services on steel141
- Dev server added to MCSManager with autoStart

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-21 03:14:45 -04:00

341 lines
14 KiB
Python

"""
Event Dispatcher — watches server log events and triggers AI God responses.
Parses deaths, joins, leaves, advancements, PvP kills, and chat blasphemy
from the Minecraft server log. Each event type has a configurable trigger
probability and cooldown. When fired, formats the event as a god_system
prompt and sends it to the model.
Usage:
from agent.tools.event_dispatcher import EventDispatcher
dispatcher = EventDispatcher(config, llm_callback, rcon_fn)
dispatcher.process_log_line(line) # call for each new log line
Config keys:
event_triggers_enabled: bool (default true)
event_cooldown_global: int seconds (default 30)
event_cooldown_per_player: int seconds (default 120)
event_triggers: dict of event_type -> {probability, cooldown_override, enabled}
"""
import json
import logging
import random
import re
import threading
import time
from collections import defaultdict
from typing import Any, Callable, Dict, List, Optional
log = logging.getLogger(__name__)
# ── Event patterns ─────────────────────────────────────────────────────────
DEATH_PATTERNS = [
(re.compile(r'(\w+) fell from a high place'), "fall", "fell from a high place"),
(re.compile(r'(\w+) hit the ground too hard'), "fall", "hit the ground too hard"),
(re.compile(r'(\w+) was slain by (\w+)'), "mob_kill", "was slain by {1}"),
(re.compile(r'(\w+) was shot by (\w+)'), "ranged_kill", "was shot by {1}"),
(re.compile(r'(\w+) drowned'), "drown", "drowned"),
(re.compile(r'(\w+) tried to swim in lava'), "lava", "tried to swim in lava"),
(re.compile(r'(\w+) burned to death'), "fire", "burned to death"),
(re.compile(r'(\w+) went up in flames'), "fire", "went up in flames"),
(re.compile(r'(\w+) blew up'), "explosion", "blew up"),
(re.compile(r'(\w+) was blown up by (\w+)'), "explosion", "was blown up by {1}"),
(re.compile(r'(\w+) suffocated in a wall'), "suffocate", "suffocated in a wall"),
(re.compile(r'(\w+) starved to death'), "starve", "starved to death"),
(re.compile(r'(\w+) was killed by (\w+)'), "pvp_or_mob", "was killed by {1}"),
(re.compile(r'(\w+) was pricked to death'), "cactus", "was pricked to death"),
(re.compile(r'(\w+) withered away'), "wither", "withered away"),
(re.compile(r'(\w+) was impaled by (\w+)'), "trident", "was impaled by {1}"),
(re.compile(r'(\w+) fell out of the world'), "void", "fell out of the world"),
]
JOIN_PATTERN = re.compile(r'(\w+) joined the game')
LEAVE_PATTERN = re.compile(r'(\w+) left the game')
ADVANCEMENT_PATTERN = re.compile(r'(\w+) has made the advancement \[(.+?)\]')
CHALLENGE_PATTERN = re.compile(r'(\w+) has completed the challenge \[(.+?)\]')
# Players that killed other players — detect from death messages
PVP_KILLERS = {"was slain by", "was shot by", "was killed by"}
# ── Default trigger config ─────────────────────────────────────────────────
DEFAULT_TRIGGERS = {
"death_fall": {"probability": 0.40, "enabled": True},
"death_lava": {"probability": 0.35, "enabled": True},
"death_mob": {"probability": 0.25, "enabled": True},
"death_pvp": {"probability": 0.60, "enabled": True},
"death_drown": {"probability": 0.20, "enabled": True},
"death_fire": {"probability": 0.25, "enabled": True},
"death_explosion": {"probability": 0.30, "enabled": True},
"death_void": {"probability": 0.50, "enabled": True},
"death_starve": {"probability": 0.30, "enabled": True},
"death_other": {"probability": 0.15, "enabled": True},
"join_first": {"probability": 1.00, "enabled": True},
"join_returning": {"probability": 0.30, "enabled": True},
"leave_long": {"probability": 0.20, "enabled": True}, # played 2+ hours
"advancement": {"probability": 0.50, "enabled": True},
"challenge": {"probability": 0.70, "enabled": True},
"mass_death": {"probability": 0.80, "enabled": True}, # 2+ deaths in 30s
}
GLOBAL_COOLDOWN = 30 # seconds between any event trigger
PLAYER_COOLDOWN = 120 # seconds between triggers for same player
MASS_DEATH_WINDOW = 30 # seconds to detect mass deaths
class EventDispatcher:
def __init__(self, config: dict, llm_callback: Callable, rcon_fn: Callable):
"""
config: server config dict
llm_callback: function(prompt: str, mode: str) -> dict with {message, commands, reasoning}
rcon_fn: function(command: str) -> str
"""
self.config = config
self.llm_callback = llm_callback
self.rcon = rcon_fn
self.enabled = config.get("event_triggers_enabled", True)
# Merge user trigger config with defaults
self.triggers = dict(DEFAULT_TRIGGERS)
user_triggers = config.get("event_triggers", {})
for k, v in user_triggers.items():
if k in self.triggers:
self.triggers[k].update(v)
self.global_cooldown = config.get("event_cooldown_global", GLOBAL_COOLDOWN)
self.player_cooldown = config.get("event_cooldown_per_player", PLAYER_COOLDOWN)
# State
self._lock = threading.Lock()
self._last_global_trigger = 0.0
self._last_player_trigger: Dict[str, float] = defaultdict(float)
self._recent_deaths: List[tuple] = [] # (timestamp, player, cause)
self._player_join_times: Dict[str, float] = {}
self._known_players: set = set() # players we've seen before
self._known_players_path = config.get("event_known_players_path",
config.get("log_path", "").replace("logs/latest.log", "aigod_known_players.json"))
self._load_known_players()
def _load_known_players(self):
try:
with open(self._known_players_path) as f:
self._known_players = set(json.load(f))
except (FileNotFoundError, json.JSONDecodeError):
self._known_players = set()
def _save_known_players(self):
try:
with open(self._known_players_path, "w") as f:
json.dump(list(self._known_players), f)
except Exception:
pass
def _can_trigger(self, player: str) -> bool:
"""Check global and per-player cooldowns."""
now = time.time()
with self._lock:
if now - self._last_global_trigger < self.global_cooldown:
return False
if now - self._last_player_trigger[player] < self.player_cooldown:
return False
return True
def _mark_triggered(self, player: str):
now = time.time()
with self._lock:
self._last_global_trigger = now
self._last_player_trigger[player] = now
def _should_fire(self, event_type: str) -> bool:
"""Roll probability for this event type."""
trigger = self.triggers.get(event_type, {"probability": 0.0, "enabled": False})
if not trigger.get("enabled", False):
return False
return random.random() < trigger.get("probability", 0.0)
def _fire_event(self, event_text: str, player: str, event_type: str):
"""Send the event to the LLM and execute the response."""
self._mark_triggered(player)
log.info(f"[event] firing {event_type} for {player}: {event_text}")
try:
result = self.llm_callback(event_text, "god_system")
if not result:
return
message = result.get("message", "")
commands = result.get("commands", [])
# Send God's message
if message:
# Format for Minecraft chat
safe_msg = message.replace('"', '\\"')
prefix = self.config.get("god_chat_prefix", "[\u00a7d\u00a7lGOD\u00a7r]")
self.rcon(f'tellraw @a {{"text":"{prefix} {safe_msg}"}}')
# Execute commands
for cmd in commands[:8]: # cap at 8 commands
if isinstance(cmd, str) and cmd.strip():
try:
self.rcon(cmd.strip())
except Exception as e:
log.warning(f"[event] command failed: {cmd}{e}")
log.info(f"[event] {event_type}: sent message + {len(commands)} commands")
except Exception as e:
log.error(f"[event] LLM callback failed for {event_type}: {e}")
# ── Log line processing ────────────────────────────────────────────────
def process_log_line(self, line: str):
"""Parse a log line and dispatch any events."""
if not self.enabled:
return
# Strip the log prefix to get the message
# Format: [HH:MM:SS INFO]: message
msg_match = re.search(r'INFO\]: (.+)$', line)
if not msg_match:
return
msg = msg_match.group(1).strip()
self._check_death(msg)
self._check_join(msg)
self._check_leave(msg)
self._check_advancement(msg)
def _check_death(self, msg: str):
now = time.time()
for pattern, death_type, desc_template in DEATH_PATTERNS:
m = pattern.search(msg)
if not m:
continue
player = m.group(1)
killer = m.group(2) if m.lastindex and m.lastindex >= 2 else None
description = desc_template.format(**{str(i): m.group(i+1) for i in range(m.lastindex or 0)}) if m.lastindex else desc_template
# Track for mass death detection
with self._lock:
self._recent_deaths.append((now, player, death_type))
# Prune old deaths
self._recent_deaths = [(t, p, c) for t, p, c in self._recent_deaths if now - t < MASS_DEATH_WINDOW]
# Check for mass death
recent_count = len(self._recent_deaths)
# Determine event type
if killer and killer in self._known_players:
event_type = "death_pvp"
elif death_type == "fall":
event_type = "death_fall"
elif death_type == "lava":
event_type = "death_lava"
elif death_type == "drown":
event_type = "death_drown"
elif death_type in ("fire",):
event_type = "death_fire"
elif death_type == "explosion":
event_type = "death_explosion"
elif death_type == "void":
event_type = "death_void"
elif death_type == "starve":
event_type = "death_starve"
elif death_type in ("mob_kill", "ranged_kill", "pvp_or_mob"):
event_type = "death_mob"
else:
event_type = "death_other"
# Mass death override
if recent_count >= 2 and self._should_fire("mass_death") and self._can_trigger(player):
victims = list(set(p for _, p, _ in self._recent_deaths))
event_text = f"[EVENT:MASS_DEATH] {recent_count} players died within {MASS_DEATH_WINDOW} seconds: {', '.join(victims)}"
threading.Thread(target=self._fire_event, args=(event_text, player, "mass_death"), daemon=True).start()
with self._lock:
self._recent_deaths.clear()
return
if not self._can_trigger(player):
return
if not self._should_fire(event_type):
return
event_text = f"[EVENT:DEATH] Player {player} {description}"
if killer:
event_text += f" (killer: {killer})"
threading.Thread(target=self._fire_event, args=(event_text, player, event_type), daemon=True).start()
return
def _check_join(self, msg: str):
m = JOIN_PATTERN.search(msg)
if not m:
return
player = m.group(1)
self._player_join_times[player] = time.time()
is_first = player not in self._known_players
if is_first:
self._known_players.add(player)
self._save_known_players()
event_type = "join_first"
event_text = f"[EVENT:JOIN] Player {player} joined the game for the first time"
else:
event_type = "join_returning"
event_text = f"[EVENT:JOIN] Player {player} joined the game (returning player)"
if not self._can_trigger(player):
return
if not self._should_fire(event_type):
return
threading.Thread(target=self._fire_event, args=(event_text, player, event_type), daemon=True).start()
def _check_leave(self, msg: str):
m = LEAVE_PATTERN.search(msg)
if not m:
return
player = m.group(1)
join_time = self._player_join_times.pop(player, None)
if join_time:
session_minutes = (time.time() - join_time) / 60
if session_minutes >= 120: # 2+ hours
event_type = "leave_long"
event_text = f"[EVENT:LEAVE] Player {player} left the game after {session_minutes:.0f} minutes"
if not self._can_trigger(player):
return
if not self._should_fire(event_type):
return
threading.Thread(target=self._fire_event, args=(event_text, player, event_type), daemon=True).start()
def _check_advancement(self, msg: str):
m = ADVANCEMENT_PATTERN.search(msg) or CHALLENGE_PATTERN.search(msg)
if not m:
return
player = m.group(1)
advancement = m.group(2)
is_challenge = bool(CHALLENGE_PATTERN.search(msg))
event_type = "challenge" if is_challenge else "advancement"
event_text = f"[EVENT:ADVANCEMENT] Player {player} earned [{advancement}]"
if not self._can_trigger(player):
return
if not self._should_fire(event_type):
return
threading.Thread(target=self._fire_event, args=(event_text, player, event_type), daemon=True).start()