Files

442 lines
17 KiB
Python

#!/usr/bin/env python3
"""
POS Daily Briefing — Epson TM-m30
Fetches unread news from FreshRSS, summarizes via Ollama, prints ESC/POS receipt.
"""
import os
import re
import json
import socket
import requests
import yfinance as yf
from datetime import datetime
# ── Config ────────────────────────────────────────────────────────────────────
CONFIG_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.json")
with open(CONFIG_PATH) as f:
config = json.load(f)
FRESHRSS_URL = config["freshrss_url"]
FRESHRSS_USER = config["freshrss_user"]
FRESHRSS_TOKEN = config["freshrss_api_key"]
OLLAMA_URL = config["ollama_url"]
OLLAMA_MODEL = config["ollama_model"]
YOURLS_URL = config["yourls_url"]
YOURLS_USER = config["yourls_user"]
YOURLS_PASS = config["yourls_pass"]
GRAFANA_URL = config["grafana_url"]
GRAFANA_TOKEN = config["grafana_token"]
REMOTE_HOST = config["remote_host"]
REMOTE_PASS = config["remote_pass"]
PRINTER_IP = config["printer_ip"]
PRINTER_PORT = int(config.get("printer_port", 9100))
# TM-m30 80mm paper, Font B — column width measured from test print
COLS = 57
# ── FreshRSS ──────────────────────────────────────────────────────────────────
def fetch_news():
try:
resp = requests.post(
f"{FRESHRSS_URL}/api/greader.php/accounts/ClientLogin",
data={"Email": FRESHRSS_USER, "Passwd": FRESHRSS_TOKEN},
timeout=15
)
if resp.status_code != 200:
print(f"[!] Login failed: {resp.status_code}")
return [], None
auth_token = next(
(l.split("=", 1)[1] for l in resp.text.splitlines() if l.startswith("Auth=")),
None
)
if not auth_token:
print("[!] No auth token in login response")
return [], None
except Exception as e:
print(f"[!] Login exception: {e}")
return [], None
headers = {"Authorization": f"GoogleLogin auth={auth_token}"}
try:
r = requests.get(
f"{FRESHRSS_URL}/api/greader.php/reader/api/0/stream/contents/user/-/state/com.google/reading-list",
params={"n": 100, "xt": "user/-/state/com.google/read", "output": "json"},
headers=headers,
timeout=15
)
if r.status_code != 200:
return [], auth_token
items = []
for item in r.json().get("items", []):
long_id = item.get("id", "")
summary = (item.get("summary", {}).get("content", "")
or item.get("content", {}).get("content", ""))
url = ""
if item.get("canonical"):
url = item["canonical"][0].get("href", "")
if not url and item.get("alternate"):
url = item["alternate"][0].get("href", "")
if not url:
item_id = long_id.split("/")[-1] if "/" in long_id else ""
url = f"https://fresh.sethpc.xyz/i/?c=entry&a=read&id={item_id}"
items.append({
"id": long_id,
"title": item.get("title", ""),
"summary": re.sub('<[^<]+?>', '', summary).strip(),
"url": url,
"source": item.get("origin", {}).get("title", "Unknown"),
})
return items, auth_token
except Exception as e:
print(f"[!] Fetch exception: {e}")
return [], auth_token
def mark_read(item_ids, auth_token):
if not item_ids or not auth_token:
return
headers = {"Authorization": f"GoogleLogin auth={auth_token}"}
try:
t = requests.get(
f"{FRESHRSS_URL}/api/greader.php/reader/api/0/token",
headers=headers, timeout=10
).text.strip()
requests.post(
f"{FRESHRSS_URL}/api/greader.php/reader/api/0/edit-tag",
headers=headers,
data={"i": item_ids, "a": "user/-/state/com.google/read", "T": t},
timeout=10
)
print(f"Marked {len(item_ids)} articles as read.")
except Exception as e:
print(f"[!] mark_read error: {e}")
# ── Ollama ────────────────────────────────────────────────────────────────────
def summarize_news(items, count=5, length="30-50 words"):
if not items:
return []
print(f"Phase 1: Selecting top {count} from {len(items)} items...")
titles_input = "\n".join([f"ID {i} [{item['source']}]: {item['title']}"
for i, item in enumerate(items)])
prompt_select = (
f"Select exactly {count} stories for a daily briefing:\n"
"1. Priorities: Tech, Science, World News, General, Health.\n"
"2. Variety: Max 2 stories per source.\n"
"3. Entertainment: Max 2 total.\n"
"4. Omit: Lottery, minor crime.\n"
"5. Sports: Football (College/NFL) only.\n"
f"Format: [0, 5, 12, ...]\n\nHEADLINES:\n{titles_input}"
)
selected_indices = []
try:
resp = requests.post(
f"{OLLAMA_URL}/api/generate",
json={"model": OLLAMA_MODEL, "prompt": prompt_select, "stream": False,
"options": {"num_ctx": 16384, "temperature": 0.1}, "format": "json"},
timeout=300
).json()
data = json.loads(resp.get("response", "[]"))
selected_indices = data if isinstance(data, list) else list(data.values())[0]
except Exception as e:
print(f"[!] Selection error: {e}")
selected_indices = list(range(min(count, len(items))))
candidates = []
source_counts = {}
for idx in selected_indices:
if isinstance(idx, int) and 0 <= idx < len(items):
item = items[idx]
src = item["source"]
if source_counts.get(src, 0) < 2:
candidates.append(item)
source_counts[src] = source_counts.get(src, 0) + 1
while len(candidates) < count:
for item in items:
if item not in candidates and source_counts.get(item["source"], 0) < 2:
candidates.append(item)
source_counts[item["source"]] = source_counts.get(item["source"], 0) + 1
if len(candidates) == count:
break
break
print(f"Phase 2: Summarizing {len(candidates)} items ({length})...")
content_input = "".join([
f"ARTICLE {i} ({item['title']}):\n{item['summary'][:2000]}\n\n"
for i, item in enumerate(candidates)
])
prompt_summary = (
"You are a professional news editor. Write a daily briefing.\n"
f"For EACH article, write a concise summary ({length}).\n"
"LANGUAGE RULE: Use English. Translate non-English/Spanish articles.\n"
"Return STRICTLY as a JSON list: [{\"id\": 0, \"summary\": \"...\"}, ...]\n\n"
f"CONTENT:\n{content_input}"
)
try:
resp = requests.post(
f"{OLLAMA_URL}/api/generate",
json={"model": OLLAMA_MODEL, "prompt": prompt_summary, "stream": False,
"options": {"num_ctx": 16384, "temperature": 0.3}},
timeout=600
).json()
raw = resp.get("response", "")
match = re.search(r'\[.*\]', raw, re.DOTALL)
if match:
summaries = json.loads(match.group(0))
else:
parsed = json.loads(raw)
summaries = parsed if isinstance(parsed, list) else list(parsed.values())[0]
final = []
for s in summaries:
idx = s.get("id")
if idx is not None and 0 <= idx < len(candidates):
candidates[idx]["ai_summary"] = s.get("summary")
final.append(candidates[idx])
return final
except Exception as e:
print(f"[!] Summarize parse error: {e} — using raw summaries")
for c in candidates:
c["ai_summary"] = c.get("summary", "")[:200]
return candidates
# ── Dashboard data ────────────────────────────────────────────────────────────
def get_zfs_status():
try:
cmd = (f"sshpass -p '{REMOTE_PASS}' ssh -o StrictHostKeyChecking=no "
f"root@{REMOTE_HOST} \"zpool list tank -H -o name,size,alloc,free,cap,health\"")
res = os.popen(cmd).read().strip().split()
if len(res) >= 6:
return {"name": res[0], "size": res[1], "alloc": res[2],
"free": res[3], "cap": res[4], "health": res[5]}
except:
pass
return None
def get_grafana_metrics():
headers = {"Authorization": f"Bearer {GRAFANA_TOKEN}"}
proxmox_hosts = ["192.168.0.173", "192.168.0.112", "192.168.0.197"]
metrics_data = {}
queries = {
"CPU": '100 * (1 - avg by(instance)(irate(node_cpu_seconds_total{mode="idle"}[5m])))',
"RAM": '100 * (1 - node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)',
"Disk": '100 - ((node_filesystem_avail_bytes{mountpoint="/"} * 100) / node_filesystem_size_bytes{mountpoint="/"})',
}
try:
for n, q in queries.items():
r = requests.get(
f"{GRAFANA_URL}/api/datasources/proxy/1/api/v1/query",
params={"query": q}, headers=headers, timeout=10
).json()
for res in r.get("data", {}).get("result", []):
inst = res.get("metric", {}).get("instance", "").split(":")[0]
if inst in proxmox_hosts:
metrics_data.setdefault(inst, []).append(
f"{n}:{float(res.get('value', [0, 0])[1]):.1f}%"
)
return [f"H{h.split('.')[-1]}:{''.join(s)}" for h, s in metrics_data.items()]
except:
return []
def get_weather():
try:
r = requests.get(
"https://api.open-meteo.com/v1/forecast"
"?latitude=38.8421&longitude=-77.2683"
"&daily=weathercode,temperature_2m_max,temperature_2m_min,precipitation_probability_max"
"&temperature_unit=fahrenheit&timezone=auto&forecast_days=4",
timeout=15
).json()
daily = r.get("daily", {})
wmo = {
0: "Sun", 1: "Sun", 2: "PCld", 3: "Ovcst",
45: "Fog", 48: "Fog",
51: "Driz", 53: "Driz", 55: "Driz",
61: "Rain", 63: "Rain", 65: "HvRain",
71: "Snow", 73: "Snow", 75: "HvSnow", 77: "Sleet",
80: "Shwr", 81: "Shwr", 82: "HvShwr",
85: "SnShwr", 86: "HvSnShwr",
95: "Strm", 96: "Strm", 99: "Strm",
}
parts = []
for i in range(len(daily.get("time", []))):
d = datetime.strptime(daily["time"][i], "%Y-%m-%d").strftime("%a")
cond = wmo.get(daily["weathercode"][i], "Unk")
hi = daily["temperature_2m_max"][i]
lo = daily["temperature_2m_min"][i]
pop = daily["precipitation_probability_max"][i]
parts.append(f"{d}:{cond} {hi:.0f}/{lo:.0f}({pop}%)")
return "|".join(parts)
except Exception as e:
print(f"[!] Weather error: {e}")
return None
def get_financial_snapshot():
tickers = {"SPY": "S&P", "QQQ": "NAS", "BTC-USD": "BTC", "ETH-USD": "ETH", "GOOG": "GOOG"}
snap = []
try:
data = yf.download(list(tickers.keys()), period="5d", progress=False)
for t, n in tickers.items():
try:
snap.append(f"{n}:{data['Close'][t].dropna().iloc[-1]:.0f}")
except:
pass
return "|".join(snap)
except:
return None
def get_reddit_top():
try:
r = requests.get(
"https://www.reddit.com/top.json?limit=1",
headers={"User-Agent": "SethPC/1.0"},
timeout=10
).json()
top = r["data"]["children"][0]["data"]
return {
"title": top["title"],
"subreddit": top["subreddit_name_prefixed"],
"score": top["score"],
"url": f"https://www.reddit.com{top['permalink']}",
}
except:
return None
# ── YOURLS ────────────────────────────────────────────────────────────────────
def shorten_url(long_url):
if not long_url:
return ""
import random, string
keyword = "r" + "".join(random.choices(string.ascii_lowercase + string.digits, k=3))
params = {
"username": YOURLS_USER, "password": YOURLS_PASS,
"action": "shorturl", "url": long_url,
"keyword": keyword, "format": "json"
}
try:
data = requests.get(YOURLS_URL, params=params, timeout=5).json()
if "shorturl" in data:
return data["shorturl"]
if "url" in data and "shorturl" in data["url"]:
return data["url"]["shorturl"]
if data.get("code") == "error:keyword":
params.pop("keyword")
data = requests.get(YOURLS_URL, params=params, timeout=5).json()
if "shorturl" in data:
return data["shorturl"]
if "url" in data and "shorturl" in data["url"]:
return data["url"]["shorturl"]
except Exception as e:
print(f"[!] YOURLS error: {e}")
return long_url
# ── ESC/POS receipt builder ───────────────────────────────────────────────────
def build_receipt(articles, zfs, reddit, grafana, weather, finance):
from escpos.printer import Dummy as EscPosDummy
p = EscPosDummy(profile="default")
def s(**kwargs):
p.set(font='b', **kwargs)
# Header
s(align='center', bold=True)
p.text("Daily Briefing\n")
s(align='center', bold=False)
p.text(datetime.now().strftime('%A, %B %d %I:%M %p') + "\n")
p.text("=" * COLS + "\n")
# Dashboard
s(align='left', bold=True)
p.text("SYSTEM & WEATHER\n")
s(align='left', bold=False)
if weather:
p.text(f"Wx: {weather}\n")
if zfs:
p.text(f"ZFS: {zfs['alloc']}/{zfs['size']} ({zfs['cap']}) {zfs['health']}\n")
if grafana:
p.text(" | ".join(grafana) + "\n")
if finance:
p.text(f"Mkts: {finance}\n")
p.text("-" * COLS + "\n")
# Reddit
if reddit:
s(align='left', bold=True)
p.text(f"REDDIT: {reddit.get('subreddit', '')}\n")
s(align='left', bold=False)
p.text(reddit['title'] + "\n")
url_s = shorten_url(reddit.get('url', ''))
if url_s:
s(align='center', bold=False)
p.qr(url_s, native=True, size=3)
s(align='left', bold=False)
p.text(f"{url_s} " + "-" * max(0, COLS - len(url_s) - 1) + "\n")
else:
p.text("-" * COLS + "\n")
# Articles
for i, item in enumerate(articles, 1):
s(align='left', bold=True)
p.text(f"{i}. [{item.get('source', '')}] {item.get('title', '')}\n")
s(align='left', bold=False)
summary = item.get('ai_summary', item.get('summary', ''))
if summary:
p.text(summary + "\n")
url_s = shorten_url(item.get('url', ''))
if url_s:
s(align='center', bold=False)
p.qr(url_s, native=True, size=3)
s(align='left', bold=False)
p.text(f"{url_s} " + "-" * max(0, COLS - len(url_s) - 1) + "\n")
else:
p.text("-" * COLS + "\n")
# Footer
p.text("\n")
s(align='center', bold=False)
p.text("-- end --\n\n")
p.cut()
return p.output
def print_receipt(raw_bytes):
try:
with socket.create_connection((PRINTER_IP, PRINTER_PORT), timeout=10) as sock:
sock.sendall(raw_bytes)
print(f"✓ Receipt sent to {PRINTER_IP}:{PRINTER_PORT}")
return True
except Exception as e:
print(f"[!] Print failed: {e}")
return False
# ── Main ──────────────────────────────────────────────────────────────────────
def main():
items, token = fetch_news()
zfs = get_zfs_status()
reddit = get_reddit_top()
grafana = get_grafana_metrics()
weather = get_weather()
finance = get_financial_snapshot()
if not items:
print("[!] No unread articles — printing dashboard only.")
articles = summarize_news(items, count=5, length="30-50 words") if items else []
raw = build_receipt(articles, zfs, reddit, grafana, weather, finance)
print_receipt(raw)
if articles:
mark_read([a['id'] for a in articles], token)
if __name__ == "__main__":
main()