Release v2.0.0 with dual-nail control and hardened safety

This commit is contained in:
2026-03-12 02:06:42 +00:00
parent cd07703f67
commit c4c86747e5
10 changed files with 2046 additions and 451 deletions
+352 -28
View File
@@ -19,7 +19,7 @@ import logging
import os
import csv
import math
from datetime import datetime
from datetime import datetime, timedelta
log = logging.getLogger(__name__)
@@ -73,6 +73,18 @@ class PIDController:
self._autotune_last_result = None
self._autotune_message = ""
# Sensor plausibility state
self._stale_reference_temp = None
self._stale_reference_time = None
# Flight mode state machine
self._mode = "grounded"
self._target_setpoint = self._setpoint
self._mode_started_at = None
self._mode_from_temp = None
self._takeoff_effective_seconds = None
self._scheduler_last_trigger = None
# PID instance
from simple_pid import PID
pid_cfg = config.get("pid")
@@ -118,42 +130,272 @@ class PIDController:
self._log_writer = csv.writer(self._log_file)
self._log_writer.writerow([
"timestamp", "temp_f", "setpoint_f", "output", "relay",
"kP", "kI", "kD", "loop_size_ms"
"flight_setpoint_f", "mode", "kP", "kI", "kD", "loop_size_ms"
])
log.info("Log file created: %s", path)
except Exception as e:
log.error("Failed to create log file: %s", e)
def start(self):
"""Enable the controller and start the background control loop."""
"""Enable power/PID control and ensure background monitor loop is running."""
self.start_cruise()
log.info("PID power enabled (setpoint=%.0fF)", self._setpoint)
def start_monitoring(self):
"""Start background temperature monitoring loop with power disabled."""
self._start_thread_if_needed()
with self._lock:
self._enabled = False
self._enter_mode("grounded", self._temp)
log.info("Background monitoring started (power disabled)")
def set_power(self, enabled):
"""Toggle power/PID control while keeping monitoring loop running."""
if enabled:
self.start_cruise()
log.info("PID power enabled")
return
self._start_thread_if_needed()
with self._lock:
self._enabled = False
self._enter_mode("grounded", self._temp)
log.info("PID power disabled")
self._relay_off()
def _start_thread_if_needed(self):
"""Start background control thread once, if not already running."""
with self._lock:
if self._thread is not None and self._thread.is_alive():
log.warning("Controller is already running")
return
self._stop_event.clear()
self._thread = threading.Thread(target=self._control_loop, daemon=True, name="pid-loop")
self._thread.start()
log.info("Control loop thread started")
def stop(self):
"""Stop the monitoring/controller thread and turn off the relay."""
log.info("Stopping PID controller thread")
self._stop_event.set()
with self._lock:
self._enabled = False
self._autotune_active = False
self._enter_mode("grounded", self._temp)
self._relay_off()
if self._thread is not None:
self._thread.join(timeout=5)
log.info("PID controller thread stopped")
def _enter_mode(self, mode, now_temp=None):
self._mode = mode
self._mode_started_at = time.monotonic()
if now_temp is not None:
self._mode_from_temp = float(now_temp)
else:
self._mode_from_temp = self._temp
if mode != "takeoff":
self._takeoff_effective_seconds = None
self._config.update_section("flight", {"mode": mode})
log.info("Flight mode -> %s", mode)
def _compute_takeoff_duration(self):
"""Scale takeoff duration by how hot the coil already is."""
flight = self._config.get("flight")
base_takeoff = max(5.0, float(flight.get("takeoff_seconds", 300)))
ambient = float(flight.get("ambient_temp_f", 75))
start_temp = self._mode_from_temp if self._mode_from_temp is not None else self._temp
full_delta = self._target_setpoint - ambient
remaining_delta = self._target_setpoint - start_temp
if full_delta <= 1.0:
return base_takeoff
if remaining_delta <= 0:
return 0.0
ratio = max(0.0, min(1.0, remaining_delta / full_delta))
return max(5.0, base_takeoff * ratio)
def _compute_flight_setpoint(self, temp):
flight = self._config.get("flight")
takeoff_s = max(1.0, float(flight.get("takeoff_seconds", 90)))
descent_s = max(1.0, float(flight.get("descent_seconds", 90)))
descent_target = float(flight.get("descent_target_f", 120))
turbo = bool(flight.get("turbo", False))
if self._mode == "grounded":
self._enabled = False
return self._setpoint
if self._mode == "takeoff":
if turbo:
self._enter_mode("cruise", temp)
return self._target_setpoint
start_temp = self._mode_from_temp if self._mode_from_temp is not None else temp
effective_takeoff = self._takeoff_effective_seconds
if effective_takeoff is None:
effective_takeoff = self._compute_takeoff_duration()
self._takeoff_effective_seconds = effective_takeoff
if effective_takeoff <= 0:
self._enter_mode("cruise", temp)
return self._target_setpoint
elapsed = max(0.0, time.monotonic() - (self._mode_started_at or time.monotonic()))
p = min(1.0, elapsed / effective_takeoff)
ramp = start_temp + (self._target_setpoint - start_temp) * p
if p >= 1.0:
self._enter_mode("cruise", temp)
return self._target_setpoint
return ramp
if self._mode == "descent":
if turbo:
self._enter_mode("grounded", temp)
self._enabled = False
return descent_target
start_sp = self._mode_from_temp if self._mode_from_temp is not None else self._setpoint
elapsed = max(0.0, time.monotonic() - (self._mode_started_at or time.monotonic()))
p = min(1.0, elapsed / descent_s)
ramp = start_sp + (descent_target - start_sp) * p
if p >= 1.0:
self._enabled = False
self._enter_mode("grounded", temp)
return ramp
# cruise
return self._target_setpoint
def _scheduler_check(self):
sched = self._config.get("scheduler")
if not sched.get("enabled", False):
return
times = sched.get("cutoff_times", [])
if not isinstance(times, list):
return
now = datetime.now()
key = now.strftime("%Y-%m-%d %H:%M")
hhmm = now.strftime("%H:%M")
if key == self._scheduler_last_trigger:
return
if hhmm in times and self._mode not in ("grounded", "descent"):
self._scheduler_last_trigger = key
self.start_descent()
log.info("Scheduler triggered descent at %s", hhmm)
def _mode_eta_seconds(self):
if self._mode not in ("takeoff", "descent"):
return None
started = self._mode_started_at
if started is None:
return None
flight = self._config.get("flight")
if self._mode == "takeoff":
total = self._takeoff_effective_seconds
if total is None:
total = float(flight.get("takeoff_seconds", 300))
else:
total = float(flight.get("descent_seconds", 300))
remaining = max(0.0, total - (time.monotonic() - started))
return round(remaining, 1)
def _next_cutoff_seconds(self):
sched = self._config.get("scheduler")
if not sched.get("enabled", False):
return None
times = sched.get("cutoff_times", [])
if not isinstance(times, list) or not times:
return None
now = datetime.now()
best_seconds = None
for t in times:
try:
hh, mm = t.split(":", 1)
hhv = int(hh)
mmv = int(mm)
except Exception:
continue
target = now.replace(hour=hhv, minute=mmv, second=0, microsecond=0)
if target <= now:
target = target + timedelta(days=1)
delta_sec = (target - now).total_seconds()
if best_seconds is None or delta_sec < best_seconds:
best_seconds = delta_sec
if best_seconds is None:
return None
return round(best_seconds, 1)
def start_takeoff(self):
self._start_thread_if_needed()
with self._lock:
if self._autotune_active:
self._autotune_active = False
self._autotune_message = "Autotune stopped: flight mode takeoff"
self._enabled = True
self._safety_tripped = False
self._safety_reason = ""
self._start_time = time.monotonic()
self._idle_since = None
self._stop_event.clear()
self._pid.reset()
self._target_setpoint = self._setpoint
self._enter_mode("takeoff", self._temp)
self._takeoff_effective_seconds = self._compute_takeoff_duration()
log.info(
"Takeoff effective duration %.1fs (base %.1fs)",
self._takeoff_effective_seconds,
float(self._config.get("flight").get("takeoff_seconds", 300)),
)
self._thread = threading.Thread(target=self._control_loop, daemon=True, name="pid-loop")
self._thread.start()
log.info("PID controller started (setpoint=%.0fF)", self._setpoint)
def stop(self):
"""Stop the controller and turn off the relay."""
log.info("Stopping PID controller")
self._stop_event.set()
def start_descent(self):
with self._lock:
self._enabled = False
self._autotune_active = False
self._relay_off()
if self._thread is not None:
self._thread.join(timeout=5)
log.info("PID controller stopped")
if self._autotune_active:
self._autotune_active = False
self._autotune_message = "Autotune stopped: flight mode descent"
if self._mode == "grounded":
return
self._enter_mode("descent", self._setpoint)
self._takeoff_effective_seconds = None
def start_cruise(self):
self._start_thread_if_needed()
with self._lock:
self._enabled = True
self._safety_tripped = False
self._safety_reason = ""
self._start_time = time.monotonic()
self._idle_since = None
self._pid.reset()
self._target_setpoint = self._setpoint
self._enter_mode("cruise", self._temp)
self._takeoff_effective_seconds = None
def set_flight_config(self, takeoff_seconds=None, descent_seconds=None, turbo=None, descent_target_f=None):
flight = self._config.get("flight")
updates = {}
if takeoff_seconds is not None:
updates["takeoff_seconds"] = float(takeoff_seconds)
if descent_seconds is not None:
updates["descent_seconds"] = float(descent_seconds)
if turbo is not None:
updates["turbo"] = bool(turbo)
if descent_target_f is not None:
updates["descent_target_f"] = float(descent_target_f)
if updates:
flight.update(updates)
self._config.update_section("flight", flight)
def set_scheduler(self, enabled=None, cutoff_times=None):
sched = self._config.get("scheduler")
if enabled is not None:
sched["enabled"] = bool(enabled)
if cutoff_times is not None:
sched["cutoff_times"] = list(cutoff_times)
self._config.update_section("scheduler", sched)
def _relay_off(self):
"""Ensure relay is OFF."""
@@ -176,6 +418,39 @@ class PIDController:
log.error("Failed to set relay: %s", e)
self._relay_on = on
def _reset_stale_tracker(self, temp=None):
self._stale_reference_temp = temp
self._stale_reference_time = time.monotonic()
def _check_sensor_stale(self, temp, output, loop_size, safety):
"""Trip safety if sensor appears stuck while heater drive is high."""
if temp is None:
return False
delta_limit = float(safety.get("sensor_stale_delta_f", 0.8))
stale_seconds = float(safety.get("sensor_stale_seconds", 8.0))
high_ratio = float(safety.get("stale_output_ratio", 0.65))
if self._stale_reference_time is None or self._stale_reference_temp is None:
self._reset_stale_tracker(temp)
return False
# Only enforce stale detection while requesting strong heat output.
if output < (high_ratio * loop_size):
self._reset_stale_tracker(temp)
return False
delta = abs(temp - self._stale_reference_temp)
if delta >= delta_limit:
self._reset_stale_tracker(temp)
return False
elapsed = time.monotonic() - self._stale_reference_time
if elapsed >= stale_seconds:
self._trip_safety("Sensor stale while high heater demand")
return True
return False
def _reset_autotune_state(self):
self._autotune_heating = False
self._autotune_phase_started = None
@@ -185,6 +460,11 @@ class PIDController:
def start_autotune(self, setpoint=None, hysteresis=None, cycles=None):
with self._lock:
if self._mode in ("takeoff", "descent"):
self._autotune_active = False
self._autotune_message = "Autotune disabled during {}".format(self._mode)
log.warning(self._autotune_message)
return
if setpoint is not None:
self._setpoint = float(setpoint)
self._config.set("control", "setpoint", float(setpoint))
@@ -303,12 +583,11 @@ class PIDController:
Within each cycle, the relay is ON for a proportion of time equal to
the PID output, implementing time-proportional software PWM.
"""
log.info("Control loop thread started")
try:
while not self._stop_event.is_set():
# Reload config if file changed
self._config.reload_if_changed()
self._scheduler_check()
# Apply current PID tuning
pid_cfg = self._config.get("pid")
@@ -324,29 +603,47 @@ class PIDController:
self._pid.output_limits = (0, loop_size)
with self._lock:
self._pid.setpoint = self._setpoint
# Read temperature
temp = self._tc.read()
if temp is None:
log.error("Thermocouple read returned None, disabling for safety")
log.error("Thermocouple read returned None, disabling power for safety")
self._trip_safety("Thermocouple disconnected")
break
time.sleep(0.2)
continue
with self._lock:
self._temp = temp
flight_setpoint = self._compute_flight_setpoint(temp)
with self._lock:
self._pid.setpoint = flight_setpoint
# Safety checks
safety = self._config.get("safety")
if temp > safety["max_temp_f"]:
self._trip_safety("Temperature {:.0f}F exceeds max {}F".format(temp, safety['max_temp_f']))
break
time.sleep(0.2)
continue
if not self._tc.is_connected:
self._trip_safety("Thermocouple disconnected")
break
time.sleep(0.2)
continue
if not self._enabled:
self._relay_off()
self._reset_stale_tracker(temp)
with self._lock:
self._output = 0.0
self._last_loop_time = time.monotonic()
self._loop_count += 1
self._log_counter += 1
if self._log_counter >= log_resolution:
self._log_data_point(temp, 0.0)
self._log_counter = 0
time.sleep(max(0.2, sleep_time))
continue
# Idle shutoff check
idle_minutes = safety.get("idle_shutoff_minutes", 0)
@@ -376,6 +673,13 @@ class PIDController:
with self._lock:
self._temp = temp
if temp > safety["max_temp_f"]:
self._trip_safety("Temperature {:.0f}F exceeds max {}F".format(temp, safety['max_temp_f']))
break
if not self._tc.is_connected:
self._trip_safety("Thermocouple disconnected")
break
if self._autotune_active:
output = self._update_autotune(temp, loop_size)
else:
@@ -383,6 +687,9 @@ class PIDController:
with self._lock:
self._output = output
if self._check_sensor_stale(temp, output, loop_size, safety):
break
# Software PWM: relay ON for first `output` ms of the cycle
elapsed_ms = time.monotonic() * 1000 - start_ms
should_be_on = elapsed_ms < output
@@ -409,22 +716,30 @@ class PIDController:
def _trip_safety(self, reason):
"""Trip safety shutdown."""
if self._safety_tripped and self._safety_reason == reason:
self._enabled = False
self._relay_off()
return
log.warning("SAFETY TRIP: %s", reason)
with self._lock:
self._safety_tripped = True
self._safety_reason = reason
self._enabled = False
self._enter_mode("grounded", self._temp)
self._relay_off()
def _log_data_point(self, temp, output):
"""Write a data point to the CSV log and in-memory history."""
now = time.time()
flight_setpoint = self._pid.setpoint
row = [
"{:.3f}".format(now),
"{:.2f}".format(temp),
"{:.2f}".format(self._setpoint),
"{:.2f}".format(output),
1 if self._relay_on else 0,
"{:.2f}".format(flight_setpoint),
self._mode,
"{:.4f}".format(self._pid.Kp),
"{:.4f}".format(self._pid.Ki),
"{:.4f}".format(self._pid.Kd),
@@ -444,6 +759,8 @@ class PIDController:
"timestamp": now,
"temp": round(temp, 2),
"setpoint": round(self._setpoint, 2),
"flight_setpoint": round(flight_setpoint, 2),
"mode": self._mode,
"output": round(output, 2),
"relay": self._relay_on
}
@@ -459,6 +776,7 @@ class PIDController:
with self._lock:
old_setpoint = self._setpoint
self._setpoint = float(value)
self._target_setpoint = float(value)
self._idle_since = None # Reset idle timer on setpoint change
if abs(self._setpoint - old_setpoint) >= 10:
self._pid.reset()
@@ -488,8 +806,10 @@ class PIDController:
return {
"enabled": self._enabled,
"mode": self._mode,
"temp": round(self._temp, 2),
"setpoint": round(self._setpoint, 2),
"effective_setpoint": round(self._pid.setpoint, 2),
"output": round(self._output, 2),
"relay_on": self._relay_on,
"loop_count": self._loop_count,
@@ -515,6 +835,10 @@ class PIDController:
"message": self._autotune_message,
"last_result": self._autotune_last_result,
},
"flight": self._config.get("flight"),
"scheduler": self._config.get("scheduler"),
"mode_eta_seconds": self._mode_eta_seconds(),
"next_cutoff_seconds": self._next_cutoff_seconds(),
}
@property