Add sudo translator agent with whitelist and user lock
- New sudo chat trigger: 'sudo <request>' - Authorized user only (configurable, default slingshooter08) - Uses command_model to translate natural language to JSON commands - Executes commands through existing whitelist/validator pipeline - No God persona or speech call in sudo mode - Added sudo_enabled/sudo_user/sudo_max_commands config keys - Added common give-item alias normalization (wood->oak_log, bed->white_bed) - Updated README with sudo usage and config docs
This commit is contained in:
+136
-4
@@ -31,6 +31,10 @@ BIBLE_PATTERNS = [
|
||||
re.compile(r'\[.*?\]: <(\w+)> [Bb]ible\s*$'),
|
||||
]
|
||||
|
||||
SUDO_PATTERNS = [
|
||||
re.compile(r'\[.*?\]: <(\w+)> [Ss]udo (.+)'),
|
||||
]
|
||||
|
||||
JOIN_PATTERN = re.compile(r'\[.*?\]: (\w+) joined the game')
|
||||
LEAVE_PATTERN = re.compile(r'\[.*?\]: (\w+) left the game')
|
||||
|
||||
@@ -603,6 +607,20 @@ COMMANDS_SYSTEM_PROMPT = (
|
||||
+ ITEM_LIBRARY
|
||||
)
|
||||
|
||||
SUDO_COMMANDS_SYSTEM_PROMPT = (
|
||||
"You are a Minecraft command translator. Convert a player's natural-language request into "
|
||||
"Minecraft server commands. You do NOT roleplay.\n\n"
|
||||
"Respond ONLY with valid JSON:\n"
|
||||
"{\"commands\": [\"cmd1\", \"cmd2\"]}\n\n"
|
||||
"Rules:\n"
|
||||
"- Use commands from this whitelist only: give, effect, xp, tp, time, weather, execute, kill, summon, tellraw, worldborder.\n"
|
||||
"- If the request cannot be mapped safely, return commands: [].\n"
|
||||
"- If player says 'me' or 'my', target the requesting player.\n"
|
||||
"- For give syntax: give <player> minecraft:<item_id> <count>\n"
|
||||
"- Count is last. Namespace minecraft: is required.\n"
|
||||
"- Return commands only. No commentary.\n"
|
||||
)
|
||||
|
||||
def build_message_system_prompt(config) -> str:
|
||||
base = (
|
||||
"You are God in a Minecraft server. You are benevolent but just. "
|
||||
@@ -816,18 +834,34 @@ def fix_give_command(cmd: str) -> str:
|
||||
return cmd
|
||||
player, arg2, arg3, rest = m.group(1), m.group(2), m.group(3), m.group(4)
|
||||
|
||||
def normalize_item(item: str) -> str:
|
||||
# Strip namespace for alias mapping, then re-apply
|
||||
raw = item.replace("minecraft:", "")
|
||||
aliases = {
|
||||
"wood": "oak_log",
|
||||
"logs": "oak_log",
|
||||
"log": "oak_log",
|
||||
"planks": "oak_planks",
|
||||
"plank": "oak_planks",
|
||||
"food": "bread",
|
||||
"heal": "golden_apple",
|
||||
"healing": "golden_apple",
|
||||
"bed": "white_bed",
|
||||
}
|
||||
raw = aliases.get(raw, raw)
|
||||
return f"minecraft:{raw}"
|
||||
|
||||
# Detect transposed order: give player <number> <item>
|
||||
if arg2.isdigit():
|
||||
count, item = arg2, arg3
|
||||
if not item.startswith("minecraft:"):
|
||||
item = f"minecraft:{item}"
|
||||
item = normalize_item(item)
|
||||
fixed = f"give {player} {item} {count}{rest}"
|
||||
log.warning(f"Fixed transposed give: '{cmd}' -> '{fixed}'")
|
||||
return fixed
|
||||
|
||||
# Detect missing namespace: give player <item_without_prefix> <count>
|
||||
if not arg2.startswith("minecraft:") and not arg2.startswith("{"):
|
||||
item = f"minecraft:{arg2}"
|
||||
if not arg2.startswith("{"):
|
||||
item = normalize_item(arg2)
|
||||
fixed = f"give {player} {item} {arg3}{rest}"
|
||||
log.warning(f"Fixed missing namespace: '{cmd}' -> '{fixed}'")
|
||||
return fixed
|
||||
@@ -910,6 +944,86 @@ def execute_response(response, context, config, praying_player=None):
|
||||
elif "clear" in resolved: config["_weather_state"] = "clear"
|
||||
time.sleep(0.3)
|
||||
|
||||
def process_sudo(player, prompt, config):
|
||||
"""
|
||||
sudo translator mode:
|
||||
- no God persona
|
||||
- no speech generation
|
||||
- translates natural language to whitelisted commands
|
||||
- only authorized user can execute
|
||||
"""
|
||||
if not config.get("sudo_enabled", True):
|
||||
return
|
||||
|
||||
sudo_user = config.get("sudo_user", "slingshooter08")
|
||||
if player != sudo_user:
|
||||
# Keep this private and quiet
|
||||
rcon(
|
||||
f'tellraw {player} {{"text":"[SUDO] Unauthorized.","color":"red"}}',
|
||||
config["rcon_host"], config["rcon_port"], config["rcon_password"]
|
||||
)
|
||||
return
|
||||
|
||||
# Immediate private ack
|
||||
rcon(
|
||||
f'tellraw {player} {{"text":"[SUDO] Translating...","color":"gray","italic":true}}',
|
||||
config["rcon_host"], config["rcon_port"], config["rcon_password"]
|
||||
)
|
||||
|
||||
online = players_online(config)
|
||||
context_hint = (
|
||||
f"Requesting player: {player}\n"
|
||||
f"Online players: {', '.join(online) or 'none'}\n"
|
||||
f"Natural language request: {prompt}\n"
|
||||
)
|
||||
|
||||
command_model = config.get("command_model", config["model"])
|
||||
try:
|
||||
content = _llm_call(
|
||||
model=command_model,
|
||||
system=SUDO_COMMANDS_SYSTEM_PROMPT,
|
||||
user=context_hint,
|
||||
config=config,
|
||||
fmt="json",
|
||||
temperature=0.1,
|
||||
max_tokens=180,
|
||||
)
|
||||
parsed = _parse_llm_json(content)
|
||||
commands = parsed.get("commands") or []
|
||||
except Exception as e:
|
||||
log.error(f"SUDO translation failed: {e}")
|
||||
rcon(
|
||||
f'tellraw {player} {{"text":"[SUDO] Translation failed.","color":"red"}}',
|
||||
config["rcon_host"], config["rcon_port"], config["rcon_password"]
|
||||
)
|
||||
return
|
||||
|
||||
max_cmds = config.get("sudo_max_commands", 3)
|
||||
commands = commands[:max_cmds]
|
||||
|
||||
if not commands:
|
||||
rcon(
|
||||
f'tellraw {player} {{"text":"[SUDO] No safe command generated.","color":"yellow"}}',
|
||||
config["rcon_host"], config["rcon_port"], config["rcon_password"]
|
||||
)
|
||||
return
|
||||
|
||||
# Show translated command(s) privately
|
||||
safe_preview = " | ".join(commands).replace("\\", "\\\\").replace('"', '\\"')
|
||||
rcon(
|
||||
f'tellraw {player} {{"text":"[SUDO] {safe_preview}","color":"dark_gray","italic":true}}',
|
||||
config["rcon_host"], config["rcon_port"], config["rcon_password"]
|
||||
)
|
||||
|
||||
for cmd in commands:
|
||||
resolved, is_safe = validate_command(cmd, online, player)
|
||||
if not is_safe:
|
||||
continue
|
||||
log.info(f"SUDO execute: {resolved}")
|
||||
result = rcon(resolved, config["rcon_host"], config["rcon_port"], config["rcon_password"])
|
||||
log.info(f"SUDO result: {result!r}")
|
||||
time.sleep(0.2)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Prayer handler
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -1097,6 +1211,24 @@ def main():
|
||||
# Feed every line into the rolling log buffer
|
||||
add_log_event(line)
|
||||
|
||||
# sudo translator
|
||||
matched = False
|
||||
for pat in SUDO_PATTERNS:
|
||||
m = pat.search(line)
|
||||
if m:
|
||||
player = m.group(1)
|
||||
prompt = m.group(2).strip()
|
||||
log.info(f"SUDO from {player}: {prompt}")
|
||||
try:
|
||||
process_sudo(player, prompt, config)
|
||||
except Exception as e:
|
||||
log.error(f"Error processing sudo: {e}", exc_info=True)
|
||||
matched = True
|
||||
break
|
||||
|
||||
if matched:
|
||||
continue
|
||||
|
||||
# /pray
|
||||
matched = False
|
||||
for pat in PRAY_PATTERNS:
|
||||
|
||||
Reference in New Issue
Block a user