Files

200 lines
6.9 KiB
Python

"""
piNail2 Thermocouple Driver
Wraps the MAX6675 thermocouple reader with:
- Celsius to Fahrenheit conversion
- Spike/outlier filtering (median of recent readings)
- Open thermocouple detection
- Error handling
"""
import logging
import collections
import math
import importlib
import types
log = logging.getLogger(__name__)
def c_to_f(c):
"""Convert Celsius to Fahrenheit."""
return c * 9.0 / 5.0 + 32.0
class Thermocouple:
"""
MAX6675 thermocouple reader with spike filtering.
The MAX6675 occasionally returns spurious readings (we've seen 869F spikes
in otherwise stable ~680F data). This class maintains a sliding window of
recent readings and rejects outliers.
"""
def __init__(self, clk, cs, do, spike_threshold=50.0, window_size=5):
"""
Args:
clk: GPIO pin for SPI clock
cs: GPIO pin for chip select
do: GPIO pin for data out (MISO)
spike_threshold: Maximum allowed jump between filtered readings (in F)
window_size: Number of recent readings to keep for median filtering
"""
self._spike_threshold = spike_threshold
self._window_size = window_size
self._readings = collections.deque(maxlen=window_size)
self._last_good_temp = None
self._raw_temp = 0.0
self._filtered_temp = 0.0
self._is_connected = False
self._spike_count = 0
self._total_reads = 0
try:
sensor_ctor = None
module_candidates = [
("MAX6675.MAX6675", "MAX6675"),
("MAX6675.MAX6675.MAX6675", "MAX6675"),
("MAX6675.MAX6675", None),
("MAX6675.MAX6675.MAX6675", None),
]
for module_name, attr_name in module_candidates:
try:
module = importlib.import_module(module_name)
except Exception:
continue
obj = getattr(module, attr_name, None) if attr_name else module
if isinstance(obj, types.ModuleType):
obj = getattr(obj, "MAX6675", None)
if callable(obj):
sensor_ctor = obj
break
if not callable(sensor_ctor):
raise TypeError("MAX6675 constructor unavailable")
self._sensor = sensor_ctor(clk, cs, do)
self._is_connected = True
log.info("MAX6675 thermocouple initialized (CLK=%d, CS=%d, DO=%d)", clk, cs, do)
except Exception as e:
log.error("Failed to initialize MAX6675: %s", e)
self._sensor = None
self._is_connected = False
def read(self):
"""
Read temperature from the thermocouple.
Returns:
float: Filtered temperature in Fahrenheit, or None if sensor is disconnected.
"""
if self._sensor is None:
return None
self._total_reads += 1
try:
raw_c = self._sensor.readTempC()
except Exception as e:
log.error("Thermocouple read error: %s", e)
self._is_connected = False
return self._last_good_temp
# Check for open thermocouple / invalid frame.
# MAX6675 open probe often reports NaN; guard all non-finite values.
if raw_c is None:
log.warning("Thermocouple appears disconnected (raw_c=%s)", raw_c)
self._is_connected = False
return self._last_good_temp
try:
raw_c = float(raw_c)
except (TypeError, ValueError):
log.warning("Thermocouple appears disconnected (raw_c=%s)", raw_c)
self._is_connected = False
return self._last_good_temp
if (not math.isfinite(raw_c)) or raw_c > 1023:
log.warning("Thermocouple appears disconnected (raw_c=%s)", raw_c)
self._is_connected = False
return self._last_good_temp
self._is_connected = True
raw_f = c_to_f(raw_c)
self._raw_temp = raw_f
# Spike detection: if we have a previous good reading, check for unreasonable jumps
if self._last_good_temp is not None:
delta = abs(raw_f - self._last_good_temp)
if delta > self._spike_threshold:
self._spike_count += 1
log.warning(
"Spike detected: %.1fF -> %.1fF (delta=%.1fF, count=%d). Using last good value.",
self._last_good_temp, raw_f, delta, self._spike_count
)
# Still add to window but return last good — if multiple consecutive
# readings are in the "spike" range, they'll become the new normal
# via the median filter
self._readings.append(raw_f)
# If we've had many consecutive "spikes", the temperature probably
# genuinely changed fast (e.g., touching the nail with concentrate)
if self._spike_count >= self._window_size:
log.info("Spike count exceeded window size, accepting new baseline %.1fF", raw_f)
self._spike_count = 0
self._last_good_temp = raw_f
self._filtered_temp = raw_f
return raw_f
self._filtered_temp = self._last_good_temp
return self._last_good_temp
# Normal reading — reset spike counter, add to window
self._spike_count = 0
self._readings.append(raw_f)
# Median filter
if len(self._readings) >= 3:
sorted_readings = sorted(self._readings)
median = sorted_readings[len(sorted_readings) // 2]
self._filtered_temp = median
self._last_good_temp = median
else:
self._filtered_temp = raw_f
self._last_good_temp = raw_f
return self._filtered_temp
@property
def raw_temp(self):
"""Last raw (unfiltered) temperature reading in Fahrenheit."""
return self._raw_temp
@property
def filtered_temp(self):
"""Last filtered temperature reading in Fahrenheit."""
return self._filtered_temp
@property
def is_connected(self):
"""Whether the thermocouple appears to be connected."""
return self._is_connected
@property
def spike_count(self):
"""Number of spike events detected since last normal reading."""
return self._spike_count
@property
def stats(self):
"""Return diagnostic stats."""
def safe(value):
if isinstance(value, (int, float)) and math.isfinite(float(value)):
return round(float(value), 2)
return None
return {
"raw_temp": safe(self._raw_temp),
"filtered_temp": safe(self._filtered_temp),
"is_connected": self._is_connected,
"total_reads": self._total_reads,
"recent_readings": [safe(r) for r in self._readings],
}