463 lines
18 KiB
Python
463 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
POS Daily Briefing — Epson TM-m30
|
|
Fetches unread news from FreshRSS, summarizes via Ollama, prints ESC/POS receipt.
|
|
"""
|
|
import os
|
|
import re
|
|
import json
|
|
import socket
|
|
import requests
|
|
import yfinance as yf
|
|
from datetime import datetime
|
|
|
|
# ── Config ────────────────────────────────────────────────────────────────────
|
|
CONFIG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
|
|
with open(CONFIG_PATH) as f:
|
|
config = json.load(f)
|
|
|
|
FRESHRSS_URL = config["freshrss_url"]
|
|
FRESHRSS_USER = config["freshrss_user"]
|
|
FRESHRSS_TOKEN = config["freshrss_api_key"]
|
|
OLLAMA_URL = config["ollama_url"]
|
|
OLLAMA_MODEL = config["ollama_model"]
|
|
YOURLS_URL = config["yourls_url"]
|
|
YOURLS_USER = config["yourls_user"]
|
|
YOURLS_PASS = config["yourls_pass"]
|
|
GRAFANA_URL = config["grafana_url"]
|
|
GRAFANA_TOKEN = config["grafana_token"]
|
|
REMOTE_HOST = config["remote_host"]
|
|
REMOTE_PASS = config["remote_pass"]
|
|
PRINTER_IP = config["printer_ip"]
|
|
PRINTER_PORT = int(config.get("printer_port", 9100))
|
|
|
|
# TM-m30 80mm paper, Font B — column width measured from test print
|
|
COLS = 57
|
|
|
|
# ── FreshRSS ──────────────────────────────────────────────────────────────────
|
|
def fetch_news():
|
|
try:
|
|
resp = requests.post(
|
|
f"{FRESHRSS_URL}/api/greader.php/accounts/ClientLogin",
|
|
data={"Email": FRESHRSS_USER, "Passwd": FRESHRSS_TOKEN},
|
|
timeout=15
|
|
)
|
|
if resp.status_code != 200:
|
|
print(f"[!] Login failed: {resp.status_code}")
|
|
return [], None
|
|
auth_token = next(
|
|
(l.split("=", 1)[1] for l in resp.text.splitlines() if l.startswith("Auth=")),
|
|
None
|
|
)
|
|
if not auth_token:
|
|
print("[!] No auth token in login response")
|
|
return [], None
|
|
except Exception as e:
|
|
print(f"[!] Login exception: {e}")
|
|
return [], None
|
|
|
|
headers = {"Authorization": f"GoogleLogin auth={auth_token}"}
|
|
try:
|
|
r = requests.get(
|
|
f"{FRESHRSS_URL}/api/greader.php/reader/api/0/stream/contents/user/-/state/com.google/reading-list",
|
|
params={"n": 100, "xt": "user/-/state/com.google/read", "output": "json"},
|
|
headers=headers,
|
|
timeout=15
|
|
)
|
|
if r.status_code != 200:
|
|
return [], auth_token
|
|
items = []
|
|
for item in r.json().get("items", []):
|
|
long_id = item.get("id", "")
|
|
summary = (item.get("summary", {}).get("content", "")
|
|
or item.get("content", {}).get("content", ""))
|
|
url = ""
|
|
if item.get("canonical"):
|
|
url = item["canonical"][0].get("href", "")
|
|
if not url and item.get("alternate"):
|
|
url = item["alternate"][0].get("href", "")
|
|
if not url:
|
|
item_id = long_id.split("/")[-1] if "/" in long_id else ""
|
|
url = f"https://fresh.sethpc.xyz/i/?c=entry&a=read&id={item_id}"
|
|
items.append({
|
|
"id": long_id,
|
|
"title": item.get("title", ""),
|
|
"summary": re.sub('<[^<]+?>', '', summary).strip(),
|
|
"url": url,
|
|
"source": item.get("origin", {}).get("title", "Unknown"),
|
|
})
|
|
return items, auth_token
|
|
except Exception as e:
|
|
print(f"[!] Fetch exception: {e}")
|
|
return [], auth_token
|
|
|
|
|
|
def mark_read(item_ids, auth_token):
|
|
if not item_ids or not auth_token:
|
|
return
|
|
headers = {"Authorization": f"GoogleLogin auth={auth_token}"}
|
|
try:
|
|
t = requests.get(
|
|
f"{FRESHRSS_URL}/api/greader.php/reader/api/0/token",
|
|
headers=headers, timeout=10
|
|
).text.strip()
|
|
requests.post(
|
|
f"{FRESHRSS_URL}/api/greader.php/reader/api/0/edit-tag",
|
|
headers=headers,
|
|
data={"i": item_ids, "a": "user/-/state/com.google/read", "T": t},
|
|
timeout=10
|
|
)
|
|
print(f"Marked {len(item_ids)} articles as read.")
|
|
except Exception as e:
|
|
print(f"[!] mark_read error: {e}")
|
|
|
|
|
|
# ── Ollama ────────────────────────────────────────────────────────────────────
|
|
def summarize_news(items, count=5, length="30-50 words"):
|
|
if not items:
|
|
return []
|
|
|
|
print(f"Phase 1: Selecting top {count} from {len(items)} items...")
|
|
titles_input = "\n".join([f"ID {i} [{item['source']}]: {item['title']}"
|
|
for i, item in enumerate(items)])
|
|
prompt_select = (
|
|
f"Select exactly {count} stories for a daily briefing:\n"
|
|
"1. Priorities: Tech, Science, World News, General, Health.\n"
|
|
"2. Variety: Max 2 stories per source.\n"
|
|
"3. Entertainment: Max 2 total.\n"
|
|
"4. Omit: Lottery, minor crime.\n"
|
|
"5. Sports: Football (College/NFL) only.\n"
|
|
f"Format: [0, 5, 12, ...]\n\nHEADLINES:\n{titles_input}"
|
|
)
|
|
selected_indices = []
|
|
try:
|
|
resp = requests.post(
|
|
f"{OLLAMA_URL}/api/generate",
|
|
json={"model": OLLAMA_MODEL, "prompt": prompt_select, "stream": False,
|
|
"options": {"num_ctx": 16384, "temperature": 0.1}, "format": "json"},
|
|
timeout=300
|
|
).json()
|
|
data = json.loads(resp.get("response", "[]"))
|
|
selected_indices = data if isinstance(data, list) else list(data.values())[0]
|
|
except Exception as e:
|
|
print(f"[!] Selection error: {e}")
|
|
selected_indices = list(range(min(count, len(items))))
|
|
|
|
candidates = []
|
|
source_counts = {}
|
|
for idx in selected_indices:
|
|
if isinstance(idx, int) and 0 <= idx < len(items):
|
|
item = items[idx]
|
|
src = item["source"]
|
|
if source_counts.get(src, 0) < 2:
|
|
candidates.append(item)
|
|
source_counts[src] = source_counts.get(src, 0) + 1
|
|
while len(candidates) < count:
|
|
for item in items:
|
|
if item not in candidates and source_counts.get(item["source"], 0) < 2:
|
|
candidates.append(item)
|
|
source_counts[item["source"]] = source_counts.get(item["source"], 0) + 1
|
|
if len(candidates) == count:
|
|
break
|
|
break
|
|
|
|
print(f"Phase 2: Summarizing {len(candidates)} items ({length})...")
|
|
content_input = "".join([
|
|
f"ARTICLE {i} ({item['title']}):\n{item['summary'][:2000]}\n\n"
|
|
for i, item in enumerate(candidates)
|
|
])
|
|
prompt_summary = (
|
|
"You are a professional news editor. Write a daily briefing.\n"
|
|
f"For EACH article, write a concise summary ({length}).\n"
|
|
"LANGUAGE RULE: Use English. Translate non-English/Spanish articles.\n"
|
|
"Return STRICTLY as a JSON list: [{\"id\": 0, \"summary\": \"...\"}, ...]\n\n"
|
|
f"CONTENT:\n{content_input}"
|
|
)
|
|
try:
|
|
resp = requests.post(
|
|
f"{OLLAMA_URL}/api/generate",
|
|
json={"model": OLLAMA_MODEL, "prompt": prompt_summary, "stream": False,
|
|
"options": {"num_ctx": 16384, "temperature": 0.3}},
|
|
timeout=600
|
|
).json()
|
|
raw = resp.get("response", "")
|
|
match = re.search(r'\[.*\]', raw, re.DOTALL)
|
|
if match:
|
|
summaries = json.loads(match.group(0))
|
|
else:
|
|
parsed = json.loads(raw)
|
|
summaries = parsed if isinstance(parsed, list) else list(parsed.values())[0]
|
|
final = []
|
|
for s in summaries:
|
|
idx = s.get("id")
|
|
if idx is not None and 0 <= idx < len(candidates):
|
|
candidates[idx]["ai_summary"] = s.get("summary")
|
|
final.append(candidates[idx])
|
|
return final
|
|
except Exception as e:
|
|
print(f"[!] Summarize parse error: {e} — using raw summaries")
|
|
for c in candidates:
|
|
c["ai_summary"] = c.get("summary", "")[:200]
|
|
return candidates
|
|
|
|
|
|
# ── Dashboard data ────────────────────────────────────────────────────────────
|
|
def get_zfs_status():
|
|
try:
|
|
cmd = (f"sshpass -p '{REMOTE_PASS}' ssh -o StrictHostKeyChecking=no "
|
|
f"root@{REMOTE_HOST} \"zpool list tank -H -o name,size,alloc,free,cap,health\"")
|
|
res = os.popen(cmd).read().strip().split()
|
|
if len(res) >= 6:
|
|
return {"name": res[0], "size": res[1], "alloc": res[2],
|
|
"free": res[3], "cap": res[4], "health": res[5]}
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
|
|
def get_grafana_metrics():
|
|
headers = {"Authorization": f"Bearer {GRAFANA_TOKEN}"}
|
|
proxmox_hosts = ["192.168.0.173", "192.168.0.112", "192.168.0.197"]
|
|
metrics_data = {}
|
|
queries = {
|
|
"CPU": '100 * (1 - avg by(instance)(irate(node_cpu_seconds_total{mode="idle"}[5m])))',
|
|
"RAM": '100 * (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)',
|
|
"Disk": '100 - ((node_filesystem_avail_bytes{mountpoint="/"} * 100) / node_filesystem_size_bytes{mountpoint="/"})',
|
|
}
|
|
try:
|
|
for n, q in queries.items():
|
|
r = requests.get(
|
|
f"{GRAFANA_URL}/api/datasources/proxy/1/api/v1/query",
|
|
params={"query": q}, headers=headers, timeout=10
|
|
).json()
|
|
for res in r.get("data", {}).get("result", []):
|
|
inst = res.get("metric", {}).get("instance", "").split(":")[0]
|
|
if inst in proxmox_hosts:
|
|
metrics_data.setdefault(inst, []).append(
|
|
f"{n}:{float(res.get('value', [0, 0])[1]):.1f}%"
|
|
)
|
|
return [f"H{h.split('.')[-1]}:{''.join(s)}" for h, s in metrics_data.items()]
|
|
except:
|
|
return []
|
|
|
|
|
|
def get_weather():
|
|
try:
|
|
r = requests.get(
|
|
"https://api.open-meteo.com/v1/forecast"
|
|
"?latitude=38.8421&longitude=-77.2683"
|
|
"&daily=weathercode,temperature_2m_max,temperature_2m_min,precipitation_probability_max"
|
|
"&temperature_unit=fahrenheit&timezone=auto&forecast_days=4",
|
|
timeout=15
|
|
).json()
|
|
daily = r.get("daily", {})
|
|
wmo = {
|
|
0: "Sun", 1: "Sun", 2: "PCld", 3: "Ovcst",
|
|
45: "Fog", 48: "Fog",
|
|
51: "Driz", 53: "Driz", 55: "Driz",
|
|
61: "Rain", 63: "Rain", 65: "HvRain",
|
|
71: "Snow", 73: "Snow", 75: "HvSnow", 77: "Sleet",
|
|
80: "Shwr", 81: "Shwr", 82: "HvShwr",
|
|
85: "SnShwr", 86: "HvSnShwr",
|
|
95: "Strm", 96: "Strm", 99: "Strm",
|
|
}
|
|
parts = []
|
|
for i in range(len(daily.get("time", []))):
|
|
d = datetime.strptime(daily["time"][i], "%Y-%m-%d").strftime("%a")
|
|
cond = wmo.get(daily["weathercode"][i], "Unk")
|
|
hi = daily["temperature_2m_max"][i]
|
|
lo = daily["temperature_2m_min"][i]
|
|
pop = daily["precipitation_probability_max"][i]
|
|
parts.append(f"{d}:{cond} {hi:.0f}/{lo:.0f}({pop}%)")
|
|
return "|".join(parts)
|
|
except Exception as e:
|
|
print(f"[!] Weather error: {e}")
|
|
return None
|
|
|
|
|
|
def get_financial_snapshot():
|
|
tickers = {"SPY": "S&P", "QQQ": "NAS", "BTC-USD": "BTC", "ETH-USD": "ETH", "GOOG": "GOOG"}
|
|
snap = []
|
|
try:
|
|
data = yf.download(list(tickers.keys()), period="5d", progress=False)
|
|
for t, n in tickers.items():
|
|
try:
|
|
snap.append(f"{n}:{data['Close'][t].dropna().iloc[-1]:.0f}")
|
|
except:
|
|
pass
|
|
return "|".join(snap)
|
|
except:
|
|
return None
|
|
|
|
|
|
def get_uptime_kuma_status():
|
|
try:
|
|
res = os.popen(
|
|
"ssh -o StrictHostKeyChecking=no pve173 "
|
|
"\"pct exec 147 -- sqlite3 /app/uptime-kuma/data/kuma.db "
|
|
"'SELECT name FROM monitor WHERE active = 1 AND id IN "
|
|
"(SELECT monitor_id FROM heartbeat WHERE id IN "
|
|
"(SELECT MAX(id) FROM heartbeat GROUP BY monitor_id) AND status = 0);'\""
|
|
).read().strip()
|
|
if res:
|
|
return res.replace("\n", ", ")
|
|
except:
|
|
pass
|
|
return None
|
|
|
|
|
|
def get_reddit_top():
|
|
try:
|
|
r = requests.get(
|
|
"https://www.reddit.com/top.json?limit=1",
|
|
headers={"User-Agent": "SethPC/1.0"},
|
|
timeout=10
|
|
).json()
|
|
top = r["data"]["children"][0]["data"]
|
|
return {
|
|
"title": top["title"],
|
|
"subreddit": top["subreddit_name_prefixed"],
|
|
"score": top["score"],
|
|
"url": f"https://www.reddit.com{top['permalink']}",
|
|
}
|
|
except:
|
|
return None
|
|
|
|
|
|
# ── YOURLS ────────────────────────────────────────────────────────────────────
|
|
def shorten_url(long_url):
|
|
if not long_url:
|
|
return ""
|
|
import random, string
|
|
keyword = "r" + "".join(random.choices(string.ascii_lowercase + string.digits, k=3))
|
|
params = {
|
|
"username": YOURLS_USER, "password": YOURLS_PASS,
|
|
"action": "shorturl", "url": long_url,
|
|
"keyword": keyword, "format": "json"
|
|
}
|
|
try:
|
|
data = requests.get(YOURLS_URL, params=params, timeout=5).json()
|
|
if "shorturl" in data:
|
|
return data["shorturl"]
|
|
if "url" in data and "shorturl" in data["url"]:
|
|
return data["url"]["shorturl"]
|
|
if data.get("code") == "error:keyword":
|
|
params.pop("keyword")
|
|
data = requests.get(YOURLS_URL, params=params, timeout=5).json()
|
|
if "shorturl" in data:
|
|
return data["shorturl"]
|
|
if "url" in data and "shorturl" in data["url"]:
|
|
return data["url"]["shorturl"]
|
|
except Exception as e:
|
|
print(f"[!] YOURLS error: {e}")
|
|
return long_url
|
|
|
|
|
|
# ── ESC/POS receipt builder ───────────────────────────────────────────────────
|
|
def build_receipt(articles, zfs, reddit, grafana, weather, finance, kuma):
|
|
from escpos.printer import Dummy as EscPosDummy
|
|
p = EscPosDummy(profile="default")
|
|
|
|
def s(**kwargs):
|
|
p.set(font='b', **kwargs)
|
|
|
|
# Header
|
|
s(align='center', bold=True)
|
|
p.text("Daily Briefing\n")
|
|
s(align='center', bold=False)
|
|
p.text(datetime.now().strftime('%A, %B %d %I:%M %p') + "\n")
|
|
p.text("=" * COLS + "\n")
|
|
|
|
# Dashboard
|
|
s(align='left', bold=True)
|
|
p.text("SYSTEM & WEATHER\n")
|
|
s(align='left', bold=False)
|
|
if weather:
|
|
p.text(f"Wx: {weather}\n")
|
|
if kuma:
|
|
s(align='left', bold=True)
|
|
p.text(f"ALERT: {kuma}\n")
|
|
s(align='left', bold=False)
|
|
if zfs:
|
|
p.text(f"ZFS: {zfs['alloc']}/{zfs['size']} ({zfs['cap']}) {zfs['health']}\n")
|
|
if grafana:
|
|
p.text(" | ".join(grafana) + "\n")
|
|
if finance:
|
|
p.text(f"Mkts: {finance}\n")
|
|
p.text("-" * COLS + "\n")
|
|
|
|
# Reddit
|
|
if reddit:
|
|
s(align='left', bold=True)
|
|
p.text(f"REDDIT: {reddit.get('subreddit', '')}\n")
|
|
s(align='left', bold=False)
|
|
p.text(reddit['title'] + "\n")
|
|
url_s = shorten_url(reddit.get('url', ''))
|
|
if url_s:
|
|
s(align='center', bold=False)
|
|
p.qr(url_s, native=True, size=3)
|
|
s(align='left', bold=False)
|
|
p.text(f"{url_s} " + "-" * max(0, COLS - len(url_s) - 1) + "\n")
|
|
else:
|
|
p.text("-" * COLS + "\n")
|
|
|
|
# Articles
|
|
for i, item in enumerate(articles, 1):
|
|
s(align='left', bold=True)
|
|
p.text(f"{i}. [{item.get('source', '')}] {item.get('title', '')}\n")
|
|
s(align='left', bold=False)
|
|
summary = item.get('ai_summary', item.get('summary', ''))
|
|
if summary:
|
|
p.text(summary + "\n")
|
|
url_s = shorten_url(item.get('url', ''))
|
|
if url_s:
|
|
s(align='center', bold=False)
|
|
p.qr(url_s, native=True, size=3)
|
|
s(align='left', bold=False)
|
|
p.text(f"{url_s} " + "-" * max(0, COLS - len(url_s) - 1) + "\n")
|
|
else:
|
|
p.text("-" * COLS + "\n")
|
|
|
|
# Footer
|
|
p.text("\n")
|
|
s(align='center', bold=False)
|
|
p.text("-- end --\n\n")
|
|
p.cut()
|
|
return p.output
|
|
|
|
|
|
def print_receipt(raw_bytes):
|
|
try:
|
|
with socket.create_connection((PRINTER_IP, PRINTER_PORT), timeout=10) as sock:
|
|
sock.sendall(raw_bytes)
|
|
print(f"✓ Receipt sent to {PRINTER_IP}:{PRINTER_PORT}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"[!] Print failed: {e}")
|
|
return False
|
|
|
|
|
|
# ── Main ──────────────────────────────────────────────────────────────────────
|
|
def main():
|
|
items, token = fetch_news()
|
|
zfs = get_zfs_status()
|
|
reddit = get_reddit_top()
|
|
grafana = get_grafana_metrics()
|
|
weather = get_weather()
|
|
finance = get_financial_snapshot()
|
|
kuma = get_uptime_kuma_status()
|
|
|
|
if not items:
|
|
print("[!] No unread articles — printing dashboard only.")
|
|
|
|
articles = summarize_news(items, count=5, length="30-50 words") if items else []
|
|
raw = build_receipt(articles, zfs, reddit, grafana, weather, finance, kuma)
|
|
print_receipt(raw)
|
|
|
|
if articles:
|
|
mark_read([a['id'] for a in articles], token)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|