#!/usr/bin/env python3 """ Kitty-Workbench Demo ==================== Run this in a kitty terminal to see the interactive display panel. Usage: python3 demo.py What it does: 1. Opens a Unix socket server 2. Splits your kitty window and launches the TUI in the right pane 3. Pushes a diagnostic scenario (Heathkit IO-102 oscilloscope focus repair) 4. Leaves the TUI running so you can interact with the checklist Press Ctrl+C to close. """ import asyncio import os import sys import time from pathlib import Path # Ensure kitty_workbench is importable sys.path.insert(0, str(Path(__file__).parent / "src")) from kitty_workbench.protocol import ( encode_message, decode_message, ReadyEvent, InitCmd, DisplayCmd, LayoutCmd, LogCmd, ShutdownCmd, ) from kitty_workbench.project import create_project async def main(): project_name = "demo-io102" title = "Heathkit IO-102 Focus Diagnostic" sock_path = f"/tmp/kitt-{project_name}.sock" # Clean up stale socket if os.path.exists(sock_path): os.unlink(sock_path) # Ensure project dir exists create_project(project_name, title) # -- Socket server -- ready = asyncio.Event() tui_writer = None async def on_connect(reader, writer): nonlocal tui_writer tui_writer = writer while True: line = await reader.readline() if not line: break msg = decode_message(line.decode().strip()) if isinstance(msg, ReadyEvent): ready.set() elif msg is not None: # Print user events to this terminal from dataclasses import asdict print(f" [event] {asdict(msg)}") server = await asyncio.start_unix_server(on_connect, path=sock_path) # -- Launch TUI in a kitty split -- print(f"Launching Kitty-Workbench TUI...") tui_cmd = [ sys.executable, "-m", "kitty_workbench", "tui", project_name, "--socket", sock_path, ] # Try kitty split first, fall back to tmux, then a new window launched = False if os.environ.get("KITTY_PID") or os.environ.get("KITTY_WINDOW_ID"): # We're inside kitty — use native split import subprocess result = subprocess.run( ["kitty", "@", "launch", "--location=vsplit", "--title", title] + tui_cmd, capture_output=True, text=True, ) if result.returncode == 0: print(f" Opened kitty split pane (id: {result.stdout.strip()})") launched = True if not launched and os.environ.get("TMUX"): import subprocess result = subprocess.run( ["tmux", "split-window", "-h", "-d", "-P", "-F", "#{pane_id}"] + tui_cmd, capture_output=True, text=True, ) if result.returncode == 0: print(f" Opened tmux split pane ({result.stdout.strip()})") launched = True if not launched: # Last resort: try kitty @ anyway (might work with allow_remote_control) import subprocess result = subprocess.run( ["kitty", "@", "launch", "--location=vsplit", "--title", title] + tui_cmd, capture_output=True, text=True, ) if result.returncode == 0: print(f" Opened kitty split pane (id: {result.stdout.strip()})") launched = True else: print(f" Could not auto-split. Run this in another terminal:") print(f" {' '.join(tui_cmd)}") print(f" Waiting for TUI to connect...") # Wait for TUI to connect try: await asyncio.wait_for(ready.wait(), timeout=15) except asyncio.TimeoutError: print("TUI did not connect within 15s. Is kitty remote control enabled?") print("Add to ~/.config/kitty/kitty.conf:") print(" allow_remote_control yes") server.close() return print(" TUI connected!\n") async def send(cmd): tui_writer.write((encode_message(cmd) + "\n").encode()) await tui_writer.drain() await asyncio.sleep(0.4) # -- Push demo content -- print("Pushing diagnostic scenario...") await send(InitCmd( project=project_name, title=title, image_protocol="none", )) await send(LayoutCmd(panes={ "main": {"ratio": 2}, "sidebar": {"ratio": 1, "position": "right"}, "log": {"ratio": 1, "position": "bottom"}, })) await send(DisplayCmd( widget="markdown", content="""# HV Focus Circuit Diagnostic ## CRT Focus Voltage Divider The focus voltage is derived from the HV supply through a resistive divider: - **R412** (910K) + **R413** (2.2M) + **R414** (1M) - Expected focus voltage: ~2.1kV at CRT pin 6 - Measured: **1.8kV — low by 300V** ## Probable Cause Carbon composition resistors R412-R414 have drifted with age. R412 shows **+16.7% drift** — replacing with metal film. ## Circuit ``` HV Supply (5.2kV) │ [R414] 1M │ ├──── Focus pin (CRT pin 6) │ [R413] 2.2M │ [R412] 910K ◄── DRIFTED to 1.05M │ GND ``` """, pane="main", clear=True, )) await send(DisplayCmd( widget="checklist", items=[ {"label": "Measure R412 (910K)", "checked": True}, {"label": "Measure R413 (2.2M)", "checked": True}, {"label": "Measure C201 ESR", "checked": True}, {"label": "Replace R412", "checked": False}, {"label": "Re-measure focus voltage", "checked": False}, {"label": "Verify CRT focus", "checked": False}, ], pane="sidebar", clear=True, )) await send(LogCmd(entry="R412: 1.05M (expected 910K) — FAIL +16.7%", level="warning")) await send(LogCmd(entry="R413: 2.18M (expected 2.2M) — PASS", level="success")) await send(LogCmd(entry="C201 ESR: 0.3Ω — PASS", level="success")) await send(LogCmd(entry="Replacing R412 with 910K 1% metal film", level="info")) print("Demo loaded! Interact with the checklist in the TUI pane.") print("Events from the TUI will appear below.\n") print("Press Ctrl+C to close.\n") # Keep running and print events try: while True: await asyncio.sleep(1) except (KeyboardInterrupt, asyncio.CancelledError): print("\nShutting down...") try: await send(ShutdownCmd()) except Exception: pass server.close() if os.path.exists(sock_path): os.unlink(sock_path) if __name__ == "__main__": asyncio.run(main())