Files
mortdecai-gateway/ledger_receiver.py
Seth 968b00890f Dual ledger: tamper-proof transaction tracking on both sides
Every inference request is recorded in a local JSONL ledger with a
SHA-256 hash of (id + tokens + duration + cost + shared_secret).

Both sides keep independent copies:
- Gateway (Matt's): writes to ledger.jsonl on every request
- Receiver (Seth's): receives callbacks, saves per-gateway ledger

Endpoints:
- GET /ledger — view transactions + total cost
- GET /reconcile — compare ledger vs stats, verify all hashes
- POST /config — adjust cost params live

ledger_receiver.py runs on Seth's server:
- POST /transaction — receive and verify gateway callbacks
- GET /summary — total cost per gateway
- GET /ledger — all transactions across gateways

If either side resets stats, the other's ledger has the full history.
If either side tampers with entries, hash verification catches it.

Tested: request → ledger write → reconcile → hash valid → zero discrepancy

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-20 19:56:10 -04:00

148 lines
4.9 KiB
Python

#!/usr/bin/env python3
"""
Ledger Receiver — runs on YOUR server to collect transaction records from remote gateways.
Each gateway POSTs transactions here. You keep an independent copy of every
transaction with hash verification. If the gateway operator resets their stats,
your ledger still has the full history.
Usage:
python3 ledger_receiver.py
LEDGER_SECRET=shared_secret python3 ledger_receiver.py
Endpoints:
POST /transaction — receive a transaction from a gateway
GET /ledger — view all transactions
GET /reconcile/<host> — compare your ledger against a gateway's
GET /summary — total cost by gateway
"""
import json
import os
import hashlib
import threading
import time
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import urlparse
LISTEN_PORT = int(os.environ.get("RECEIVER_PORT", "8435"))
LEDGER_DIR = os.environ.get("LEDGER_DIR", "/var/lib/mortdecai-ledger")
LEDGER_SECRET = os.environ.get("LEDGER_SECRET", "change_me_shared_secret")
_lock = threading.Lock()
def _verify_hash(entry):
raw = f"{entry['id']}|{entry['tokens_in']}|{entry['tokens_out']}|{entry['duration']}|{entry['cost']}|{LEDGER_SECRET}"
expected = hashlib.sha256(raw.encode()).hexdigest()[:16]
return entry.get("hash") == expected
def _save_transaction(entry, source_ip):
"""Save a transaction to the per-gateway ledger file."""
entry["_received_at"] = time.strftime("%Y-%m-%dT%H:%M:%SZ")
entry["_source_ip"] = source_ip
entry["_hash_valid"] = _verify_hash(entry)
os.makedirs(LEDGER_DIR, exist_ok=True)
# One file per source IP
safe_ip = source_ip.replace(":", "_").replace(".", "_")
path = os.path.join(LEDGER_DIR, f"ledger_{safe_ip}.jsonl")
with _lock:
with open(path, "a") as f:
f.write(json.dumps(entry) + "\n")
def _load_all():
"""Load all ledger entries from all gateways."""
all_entries = {}
try:
for fname in os.listdir(LEDGER_DIR):
if fname.endswith(".jsonl"):
gateway = fname.replace("ledger_", "").replace(".jsonl", "")
entries = []
with open(os.path.join(LEDGER_DIR, fname)) as f:
for line in f:
if line.strip():
entries.append(json.loads(line))
all_entries[gateway] = entries
except:
pass
return all_entries
class ReceiverHandler(BaseHTTPRequestHandler):
def log_message(self, fmt, *args):
pass
def _send_json(self, status, data):
body = json.dumps(data, indent=2).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(body)
def do_POST(self):
if self.path == "/transaction":
length = int(self.headers.get("Content-Length", 0))
entry = json.loads(self.rfile.read(length))
source_ip = self.client_address[0]
valid = _verify_hash(entry)
_save_transaction(entry, source_ip)
self._send_json(200, {
"status": "recorded",
"id": entry.get("id"),
"hash_valid": valid,
})
return
self._send_json(404, {"error": "not found"})
def do_GET(self):
parsed = urlparse(self.path)
if parsed.path == "/summary":
all_data = _load_all()
summary = {}
for gateway, entries in all_data.items():
total_cost = sum(e.get("cost", 0) for e in entries)
total_tokens = sum(e.get("tokens_out", 0) for e in entries)
valid = sum(1 for e in entries if e.get("_hash_valid", False))
invalid = len(entries) - valid
summary[gateway] = {
"transactions": len(entries),
"total_cost": round(total_cost, 6),
"total_tokens_out": total_tokens,
"hashes_valid": valid,
"hashes_invalid": invalid,
}
self._send_json(200, summary)
return
if parsed.path == "/ledger":
all_data = _load_all()
flat = []
for entries in all_data.values():
flat.extend(entries)
flat.sort(key=lambda e: e.get("timestamp", ""))
total = sum(e.get("cost", 0) for e in flat)
self._send_json(200, {
"total_transactions": len(flat),
"total_cost": round(total, 6),
"last_20": flat[-20:],
})
return
self._send_json(404, {"error": "not found"})
if __name__ == "__main__":
os.makedirs(LEDGER_DIR, exist_ok=True)
print(f"Ledger Receiver on port {LISTEN_PORT}")
print(f" Ledger dir: {LEDGER_DIR}")
HTTPServer(("0.0.0.0", LISTEN_PORT), ReceiverHandler).serve_forever()