Files
Seth af5cb4df2a Semver rename: mortdecai:0.4.0, mortdecai:0.5.0
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 21:37:36 -04:00

608 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Mortdecai Ollama Gateway — authenticated proxy with power metering.
Sits in front of Ollama, provides:
- API key authentication
- Power/cost tracking (GPU utilization × TDP × electricity rate)
- Usage dashboard
- Spending cap enforcement
- Health check endpoint
Usage:
python3 gateway.py
OLLAMA_URL=http://localhost:11434 API_KEY=mk_test python3 gateway.py
"""
import json
import os
import time
import threading
import subprocess
import hashlib
import uuid
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse, parse_qs
import requests
# --- Config ---
OLLAMA_URL = os.environ.get("OLLAMA_URL", "http://localhost:11434")
LISTEN_PORT = int(os.environ.get("GATEWAY_PORT", "8434"))
API_KEY = os.environ.get("API_KEY", "mk_mortdecai_default")
STATS_FILE = os.environ.get("STATS_FILE", "/var/lib/mortdecai-gateway/stats.json")
CONFIG_FILE = os.environ.get("CONFIG_FILE", "/var/lib/mortdecai-gateway/cost_config.json")
# Default cost config (overridden by config file or env vars)
_DEFAULT_COST_CONFIG = {
"electricity_rate": 0.15, # $/kWh
"gpu_idle_watts": 15, # GPU at idle
"gpu_load_watts": 54, # GPU during inference
"system_idle_watts": 45, # Whole system idle (CPU/RAM/fans/PSU)
"system_inference_watts": 65, # Whole system during inference
"billing_mode": "marginal", # "marginal" = only extra watts; "dedicated" = all uptime
"base_rate_per_hour": 0.00, # $/hr for keeping machine on (dedicated mode only)
"spending_cap": 10.00, # $ before refusing requests
"labor_rate_per_hour": 0.00, # $/hr for operator's time (setup, maintenance)
"profit_margin": 0.00, # multiplier (0.10 = 10% markup)
"labor_hours_logged": 0.0, # total hours spent on setup/maintenance
}
def _load_cost_config():
config = dict(_DEFAULT_COST_CONFIG)
# Override from file
try:
with open(CONFIG_FILE) as f:
config.update(json.load(f))
except:
pass
# Override from env vars
for key in _DEFAULT_COST_CONFIG:
env_key = key.upper()
val = os.environ.get(env_key)
if val is not None:
try:
config[key] = type(_DEFAULT_COST_CONFIG[key])(val)
except:
pass
return config
def _save_cost_config(config):
try:
os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True)
with open(CONFIG_FILE, "w") as f:
json.dump(config, f, indent=2)
except:
pass
COST_CONFIG = _load_cost_config()
# --- Dual Ledger ---
LEDGER_FILE = os.environ.get("LEDGER_FILE", "/var/lib/mortdecai-gateway/ledger.jsonl")
LEDGER_SECRET = os.environ.get("LEDGER_SECRET", "change_me_shared_secret")
CALLBACK_URL = os.environ.get("CALLBACK_URL", "") # Seth's server endpoint for transaction logging
_ledger_lock = threading.Lock()
def _ledger_hash(entry):
"""Create a verification hash from transaction data + shared secret."""
raw = f"{entry['id']}|{entry['tokens_in']}|{entry['tokens_out']}|{entry['duration']}|{entry['cost']}|{LEDGER_SECRET}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def _ledger_write(entry):
"""Append a transaction to the local ledger."""
with _ledger_lock:
try:
os.makedirs(os.path.dirname(LEDGER_FILE), exist_ok=True)
with open(LEDGER_FILE, "a") as f:
f.write(json.dumps(entry) + "\n")
except Exception as e:
print(f"Ledger write failed: {e}")
def _ledger_callback(entry):
"""Send transaction to the client's server for cross-verification."""
if not CALLBACK_URL:
return
try:
requests.post(
CALLBACK_URL,
json=entry,
headers={"Content-Type": "application/json"},
timeout=5,
)
except:
pass # Non-blocking — don't fail inference because callback is down
def _ledger_record(tokens_in, tokens_out, duration, cost, energy_wh, model):
"""Record a transaction in the ledger and notify the client."""
entry = {
"id": str(uuid.uuid4())[:12],
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"tokens_in": tokens_in,
"tokens_out": tokens_out,
"duration": round(duration, 3),
"cost": round(cost, 8),
"energy_wh": round(energy_wh, 4),
"model": model,
"billing_mode": COST_CONFIG["billing_mode"],
}
entry["hash"] = _ledger_hash(entry)
_ledger_write(entry)
# Send to client in background
threading.Thread(target=_ledger_callback, args=(entry,), daemon=True).start()
return entry
def _ledger_load():
"""Load all ledger entries."""
entries = []
try:
with open(LEDGER_FILE) as f:
for line in f:
if line.strip():
entries.append(json.loads(line))
except:
pass
return entries
def _ledger_verify(entries):
"""Verify all ledger entries against their hashes."""
results = {"total": len(entries), "valid": 0, "invalid": 0, "invalid_ids": []}
for entry in entries:
expected = _ledger_hash(entry)
if entry.get("hash") == expected:
results["valid"] += 1
else:
results["invalid"] += 1
results["invalid_ids"].append(entry.get("id", "?"))
return results
# --- Stats tracking ---
_stats_lock = threading.Lock()
_stats = {
"total_requests": 0,
"total_tokens_in": 0,
"total_tokens_out": 0,
"total_inference_seconds": 0,
"total_energy_wh": 0.0,
"total_cost": 0.0,
"started_at": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"last_request_at": None,
"requests_rejected": 0,
}
def _load_stats():
global _stats
try:
with open(STATS_FILE) as f:
saved = json.load(f)
_stats.update(saved)
except:
pass
def _save_stats():
try:
os.makedirs(os.path.dirname(STATS_FILE), exist_ok=True)
with open(STATS_FILE, "w") as f:
json.dump(_stats, f, indent=2)
except:
pass
def _calc_marginal_cost(duration_seconds):
"""Calculate marginal electricity cost for an inference call."""
c = COST_CONFIG
if c["billing_mode"] == "marginal":
# Only charge for extra watts above idle
marginal_gpu = c["gpu_load_watts"] - c["gpu_idle_watts"]
marginal_system = c["system_inference_watts"] - c["system_idle_watts"]
marginal_watts = marginal_gpu + marginal_system
else:
# Dedicated: charge for full system draw during inference
marginal_watts = c["gpu_load_watts"] + c["system_inference_watts"]
energy_wh = (marginal_watts * duration_seconds) / 3600
electricity_cost = (energy_wh / 1000) * c["electricity_rate"]
# Apply profit margin
cost = electricity_cost * (1 + c["profit_margin"])
return marginal_watts, energy_wh, cost
def _track_request(tokens_in, tokens_out, duration_seconds, model="mortdecai:0.4.0"):
"""Track a completed inference request and record in ledger."""
marginal_watts, energy_wh, cost = _calc_marginal_cost(duration_seconds)
# Record in dual ledger
_ledger_record(tokens_in, tokens_out, duration_seconds, cost, energy_wh, model)
with _stats_lock:
_stats["total_requests"] += 1
_stats["total_tokens_in"] += tokens_in
_stats["total_tokens_out"] += tokens_out
_stats["total_inference_seconds"] += duration_seconds
_stats["last_request_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ")
_stats["total_energy_wh"] += energy_wh
_stats["total_cost"] += cost
_stats["total_marginal_watts_avg"] = (
_stats.get("total_marginal_watts_avg", marginal_watts) * 0.95 + marginal_watts * 0.05
)
# Base rate for dedicated mode
if COST_CONFIG["billing_mode"] == "dedicated" and COST_CONFIG["base_rate_per_hour"] > 0:
# Add base rate proportional to time since last request
last = _stats.get("_last_base_calc", time.time())
elapsed_hours = (time.time() - last) / 3600
_stats["total_cost"] += COST_CONFIG["base_rate_per_hour"] * elapsed_hours
_stats["_last_base_calc"] = time.time()
if _stats["total_requests"] % 10 == 0:
_save_stats()
def _check_budget():
"""Returns True if under spending cap."""
with _stats_lock:
return _stats["total_cost"] < COST_CONFIG["spending_cap"]
def _get_gpu_utilization():
"""Get current GPU utilization via nvidia-smi or rocm-smi."""
try:
# Try nvidia-smi first
result = subprocess.run(
["nvidia-smi", "--query-gpu=utilization.gpu,temperature.gpu,power.draw",
"--format=csv,noheader,nounits"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
parts = [p.strip() for p in result.stdout.strip().split(",")]
return {
"utilization": float(parts[0]),
"temperature": float(parts[1]),
"power_watts": float(parts[2]) if parts[2] != "[N/A]" else GPU_TDP_WATTS,
"source": "nvidia-smi"
}
except:
pass
try:
# Try rocm-smi for AMD
result = subprocess.run(
["rocm-smi", "--showuse", "--showtemp", "--json"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
data = json.loads(result.stdout)
# Parse rocm-smi JSON (format varies by version)
for card_id, card_data in data.items():
if isinstance(card_data, dict):
return {
"utilization": float(card_data.get("GPU use (%)", 0)),
"temperature": float(card_data.get("Temperature (Sensor edge) (C)", 0)),
"power_watts": GPU_TDP_WATTS,
"source": "rocm-smi"
}
except:
pass
return {"utilization": 0, "temperature": 0, "power_watts": 0, "source": "unavailable"}
# --- HTTP Handler ---
class GatewayHandler(BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
pass # Quiet
def _check_auth(self):
auth = self.headers.get("Authorization", "")
if auth == f"Bearer {API_KEY}" or auth == API_KEY:
return True
self._send_json(401, {"error": "Invalid API key"})
return False
def _send_json(self, status, data):
body = json.dumps(data).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", len(body))
self.end_headers()
self.wfile.write(body)
def _proxy_to_ollama(self, path, body=None):
"""Proxy request to Ollama and track usage."""
if not _check_budget():
with _stats_lock:
_stats["requests_rejected"] += 1
self._send_json(402, {
"error": "Spending cap reached",
"total_cost": _stats["total_cost"],
"cap": SPENDING_CAP,
})
return
t0 = time.time()
try:
if body:
r = requests.post(f"{OLLAMA_URL}{path}", json=body, timeout=120)
else:
r = requests.get(f"{OLLAMA_URL}{path}", timeout=10)
duration = time.time() - t0
data = r.json()
# Track token usage from response
tokens_in = data.get("prompt_eval_count", 0)
tokens_out = data.get("eval_count", 0)
model_name = (body or {}).get("model", "unknown")
if tokens_in or tokens_out:
_track_request(tokens_in, tokens_out, duration, model_name)
# Add gateway metadata to response
if isinstance(data, dict):
mw, ewh, ecost = _calc_marginal_cost(duration)
data["_gateway"] = {
"duration_seconds": round(duration, 2),
"marginal_watts": round(mw, 1),
"energy_wh": round(ewh, 4),
"estimated_cost": round(ecost, 6),
"total_cost": round(_stats["total_cost"], 4),
"budget_remaining": round(COST_CONFIG["spending_cap"] - _stats["total_cost"], 4),
"billing_mode": COST_CONFIG["billing_mode"],
}
self._send_json(r.status_code, data)
except requests.exceptions.ConnectionError:
self._send_json(502, {"error": "Ollama is not running"})
except requests.exceptions.Timeout:
self._send_json(504, {"error": "Ollama timeout"})
except Exception as e:
self._send_json(500, {"error": str(e)})
def do_GET(self):
parsed = urlparse(self.path)
# Public endpoints (no auth)
if parsed.path == "/health":
try:
r = requests.get(f"{OLLAMA_URL}/api/tags", timeout=5)
models = [m["name"] for m in r.json().get("models", [])]
self._send_json(200, {"status": "ok", "ollama": "connected", "models": models})
except:
self._send_json(503, {"status": "error", "ollama": "disconnected"})
return
if parsed.path == "/stats":
if not self._check_auth():
return
gpu = _get_gpu_utilization()
with _stats_lock:
stats_copy = {k: v for k, v in _stats.items() if not k.startswith("_")}
stats_copy["gpu"] = gpu
stats_copy["cost_config"] = COST_CONFIG
self._send_json(200, stats_copy)
return
if parsed.path == "/config":
if not self._check_auth():
return
self._send_json(200, COST_CONFIG)
return
if parsed.path == "/ledger":
if not self._check_auth():
return
entries = _ledger_load()
total_cost = sum(e.get("cost", 0) for e in entries)
self._send_json(200, {
"entries": len(entries),
"total_cost": round(total_cost, 6),
"last_10": entries[-10:],
})
return
if parsed.path == "/reconcile":
if not self._check_auth():
return
entries = _ledger_load()
verification = _ledger_verify(entries)
total_cost = sum(e.get("cost", 0) for e in entries)
self._send_json(200, {
"ledger_entries": len(entries),
"ledger_total_cost": round(total_cost, 6),
"stats_total_cost": round(_stats.get("total_cost", 0), 6),
"discrepancy": round(abs(total_cost - _stats.get("total_cost", 0)), 6),
"hash_verification": verification,
"status": "OK" if verification["invalid"] == 0 else "TAMPERED",
})
return
if parsed.path == "/dashboard":
self._serve_dashboard()
return
# Proxy everything else to Ollama
if not self._check_auth():
return
self._proxy_to_ollama(self.path)
def do_POST(self):
if not self._check_auth():
return
length = int(self.headers.get("Content-Length", 0))
body = json.loads(self.rfile.read(length)) if length > 0 else None
# Config update endpoint — adjust cost parameters live
if self.path == "/config" and body:
global COST_CONFIG
for key in body:
if key in COST_CONFIG:
COST_CONFIG[key] = type(_DEFAULT_COST_CONFIG.get(key, ""))(body[key])
_save_cost_config(COST_CONFIG)
self._send_json(200, {"status": "updated", "config": COST_CONFIG})
return
# Model update endpoint — downloads new GGUF and reloads
if self.path == "/admin/update-model" and body:
self._handle_model_update(body)
return
self._proxy_to_ollama(self.path, body)
def _handle_model_update(self, body):
"""Download a new GGUF from a URL and reload the model.
Request: {"url": "https://mortdec.ai/dl/...", "name": "mortdecai:0.5.0"}
This is opt-in — the gateway operator must enable ALLOW_MODEL_UPDATES=true.
"""
if os.environ.get("ALLOW_MODEL_UPDATES", "false").lower() != "true":
self._send_json(403, {"error": "Model updates disabled. Set ALLOW_MODEL_UPDATES=true in .env to enable."})
return
url = body.get("url")
name = body.get("name", "mortdecai-latest")
if not url:
self._send_json(400, {"error": "url is required"})
return
try:
import subprocess
# Download GGUF
gguf_path = f"/models/{name}.gguf"
print(f"Downloading model from {url}...")
r = requests.get(url, stream=True, timeout=600)
r.raise_for_status()
with open(f"models/{name}.gguf", "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
f.write(chunk)
# Create Modelfile and load
subprocess.run(
["docker", "exec", "mortdecai-ollama", "ollama", "create", name, "-f", f"/models/Modelfile"],
timeout=120, check=True
)
self._send_json(200, {"status": "ok", "model": name, "message": "Model updated and loaded"})
except Exception as e:
self._send_json(500, {"error": f"Update failed: {e}"})
def _serve_dashboard(self):
"""Simple HTML dashboard showing usage stats."""
with _stats_lock:
s = {k: v for k, v in _stats.items() if not k.startswith("_")}
gpu = _get_gpu_utilization()
c = COST_CONFIG
marginal_w = (c["gpu_load_watts"] - c["gpu_idle_watts"]) + (c["system_inference_watts"] - c["system_idle_watts"])
active = _check_budget()
avg_cost_per_req = s["total_cost"] / max(s["total_requests"], 1)
reqs_remaining = int((c["spending_cap"] - s["total_cost"]) / max(avg_cost_per_req, 0.000001)) if avg_cost_per_req > 0 else ""
html = f"""<!DOCTYPE html>
<html><head><title>Mortdecai Gateway</title>
<meta http-equiv="refresh" content="10">
<style>
body {{ font-family: monospace; background: #1a1a1a; color: #e0e0e0; padding: 2rem; max-width: 700px; margin: 0 auto; }}
h1 {{ color: #D35400; }}
h2 {{ color: #D35400; font-size: 1rem; margin-top: 1.5rem; border-bottom: 1px solid #333; padding-bottom: 0.3rem; }}
.stat {{ background: #252525; border: 1px solid #333; padding: 0.8rem 1rem; margin: 0.3rem 0; border-radius: 4px; display: flex; justify-content: space-between; }}
.label {{ color: #999; }}
.value {{ color: #D35400; font-weight: bold; }}
.ok {{ color: #4caf50; }}
.warn {{ color: #ff9800; }}
.bad {{ color: #f44336; }}
.bar {{ background: #333; border-radius: 3px; height: 20px; margin: 0.5rem 0; }}
.bar-fill {{ background: #D35400; height: 100%; border-radius: 3px; transition: width 0.5s; }}
</style></head><body>
<h1>Mortdecai Gateway</h1>
<div class="stat"><span class="label">Status</span>
<span class="value {'ok' if active else 'bad'}">{'● ACTIVE' if active else '● PAUSED (cap reached)'}</span></div>
<h2>Usage</h2>
<div class="stat"><span class="label">Requests</span><span class="value">{s['total_requests']:,}</span></div>
<div class="stat"><span class="label">Tokens (in / out)</span><span class="value">{s['total_tokens_in']:,} / {s['total_tokens_out']:,}</span></div>
<div class="stat"><span class="label">Inference Time</span><span class="value">{s['total_inference_seconds']:.0f}s ({s['total_inference_seconds']/3600:.1f}h)</span></div>
<div class="stat"><span class="label">Avg per Request</span><span class="value">{s['total_inference_seconds']/max(s['total_requests'],1):.1f}s, {s['total_tokens_out']//max(s['total_requests'],1)} tokens</span></div>
<div class="stat"><span class="label">Rejected (cap)</span><span class="value">{s['requests_rejected']}</span></div>
<div class="stat"><span class="label">Last Request</span><span class="value">{s['last_request_at'] or 'never'}</span></div>
<h2>Cost</h2>
<div class="bar"><div class="bar-fill" style="width: {min(s['total_cost']/max(c['spending_cap'],0.01)*100, 100):.0f}%"></div></div>
<div class="stat"><span class="label">Spent</span><span class="value">${s['total_cost']:.4f}</span></div>
<div class="stat"><span class="label">Budget</span><span class="value">${c['spending_cap']:.2f}</span></div>
<div class="stat"><span class="label">Remaining</span><span class="value">${c['spending_cap'] - s['total_cost']:.4f} (~{reqs_remaining} requests)</span></div>
<div class="stat"><span class="label">Avg Cost/Request</span><span class="value">${avg_cost_per_req:.6f}</span></div>
<div class="stat"><span class="label">Energy Used</span><span class="value">{s['total_energy_wh']:.1f} Wh ({s['total_energy_wh']/1000:.4f} kWh)</span></div>
<h2>Labor & Profit</h2>
<div class="stat"><span class="label">Labor Rate</span><span class="value">${c['labor_rate_per_hour']:.2f}/hr</span></div>
<div class="stat"><span class="label">Hours Logged</span><span class="value">{c['labor_hours_logged']:.1f}h</span></div>
<div class="stat"><span class="label">Labor Cost</span><span class="value">${c['labor_rate_per_hour'] * c['labor_hours_logged']:.2f}</span></div>
<div class="stat"><span class="label">Profit Margin</span><span class="value">{c['profit_margin']*100:.0f}%</span></div>
<div class="stat"><span class="label">Total Owed (electricity + labor + margin)</span><span class="value">${s['total_cost'] + c['labor_rate_per_hour'] * c['labor_hours_logged']:.4f}</span></div>
<h2>Power Model</h2>
<div class="stat"><span class="label">Billing Mode</span><span class="value">{c['billing_mode']}</span></div>
<div class="stat"><span class="label">GPU (idle → load)</span><span class="value">{c['gpu_idle_watts']}W → {c['gpu_load_watts']}W</span></div>
<div class="stat"><span class="label">System (idle → load)</span><span class="value">{c['system_idle_watts']}W → {c['system_inference_watts']}W</span></div>
<div class="stat"><span class="label">Marginal Draw</span><span class="value">{marginal_w}W per inference call</span></div>
<div class="stat"><span class="label">Electricity Rate</span><span class="value">${c['electricity_rate']}/kWh</span></div>
{'<div class="stat"><span class="label">Base Rate</span><span class="value">$' + f"{c['base_rate_per_hour']:.3f}" + '/hr</span></div>' if c['billing_mode'] == 'dedicated' else ''}
<h2>GPU</h2>
<div class="stat"><span class="label">Utilization</span><span class="value">{gpu['utilization']}%</span></div>
<div class="stat"><span class="label">Temperature</span><span class="value {'warn' if gpu['temperature'] > 75 else 'ok'}">{gpu['temperature']}°C</span></div>
<div class="stat"><span class="label">Power Draw</span><span class="value">{gpu['power_watts']}W</span></div>
<div class="stat"><span class="label">Source</span><span class="value">{gpu['source']}</span></div>
<p style="color:#555; font-size:0.8rem; margin-top:2rem;">
Config: GET /config | Update: POST /config | Stats: GET /stats (auth required)
</p>
</body></html>"""
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.end_headers()
self.wfile.write(html.encode())
def main():
_load_stats()
c = COST_CONFIG
print(f"Mortdecai Gateway starting")
print(f" Ollama: {OLLAMA_URL}")
print(f" Listen: 0.0.0.0:{LISTEN_PORT}")
print(f" GPU: {c['gpu_idle_watts']}W idle → {c['gpu_load_watts']}W load")
print(f" System: {c['system_idle_watts']}W idle → {c['system_inference_watts']}W load")
print(f" Rate: ${c['electricity_rate']}/kWh | Mode: {c['billing_mode']}")
print(f" Cap: ${c['spending_cap']}")
print(f" Dashboard: http://localhost:{LISTEN_PORT}/dashboard")
# Save stats periodically
def _periodic_save():
while True:
time.sleep(60)
with _stats_lock:
_save_stats()
t = threading.Thread(target=_periodic_save, daemon=True)
t.start()
server = HTTPServer(("0.0.0.0", LISTEN_PORT), GatewayHandler)
server.serve_forever()
if __name__ == "__main__":
main()