Files
piNail/piNail2/config.py
T

271 lines
7.8 KiB
Python

"""
piNail2 Configuration Management
JSON-based config with named fields, load/save, defaults, and hot-reload support.
Replaces the old positional P_I_D_values.txt format.
"""
import json
import os
import logging
import copy
log = logging.getLogger(__name__)
DEFAULT_CONFIG = {
"pid": {
"kP": 113.1768,
"kI": 3.5335,
"kD": 500.0,
"proportional_on_measurement": False
},
"control": {
"setpoint": 530,
"loop_size_ms": 3000,
"sleep_time": 0.4,
"enabled": False
},
"flight": {
"mode": "grounded",
"takeoff_seconds": 300,
"descent_seconds": 300,
"turbo": False,
"descent_target_f": 120,
"ambient_temp_f": 75
},
"scheduler": {
"enabled": True,
"cutoff_times": [
"23:00"
]
},
"safety": {
"max_temp_f": 750,
"spike_threshold_f": 50.0,
"idle_shutoff_minutes": 30,
"watchdog_timeout_s": 10,
"min_temp_f": 0,
"sensor_stale_seconds": 8,
"sensor_stale_delta_f": 0.8,
"stale_output_ratio": 0.65
},
"gpio": {
"relay_pin": 2,
"clk": 3,
"cs": 14,
"do": 4
},
"logging": {
"log_resolution": 1,
"log_directory": "./logs",
"max_log_lines": 10000
},
"presets": {
"Low Temp": 450,
"Medium": 530,
"High": 650
},
"web": {
"host": "0.0.0.0",
"port": 5000,
"update_interval_ms": 500
},
"autotune": {
"hysteresis_f": 8.0,
"cycles": 4
},
"nails": {
"nail1": {
"pid": {
"kP": 113.1768,
"kI": 3.5335,
"kD": 500.0,
"proportional_on_measurement": False
},
"control": {
"setpoint": 530,
"loop_size_ms": 3000,
"sleep_time": 0.4,
"enabled": False
},
"flight": {
"mode": "grounded",
"takeoff_seconds": 300,
"descent_seconds": 300,
"turbo": False,
"descent_target_f": 120,
"ambient_temp_f": 75
},
"scheduler": {
"enabled": True,
"cutoff_times": [
"23:00"
]
},
"safety": {
"max_temp_f": 750,
"spike_threshold_f": 50.0,
"idle_shutoff_minutes": 30,
"watchdog_timeout_s": 10,
"min_temp_f": 0,
"sensor_stale_seconds": 8,
"sensor_stale_delta_f": 0.8,
"stale_output_ratio": 0.65
},
"gpio": {
"relay_pin": 2,
"clk": 3,
"cs": 14,
"do": 4
},
"logging": {
"log_resolution": 1,
"log_directory": "./logs/nail1",
"max_log_lines": 10000
},
"autotune": {
"hysteresis_f": 8.0,
"cycles": 4
}
},
"nail2": {
"pid": {
"kP": 113.1768,
"kI": 3.5335,
"kD": 500.0,
"proportional_on_measurement": False
},
"control": {
"setpoint": 530,
"loop_size_ms": 3000,
"sleep_time": 0.4,
"enabled": False
},
"flight": {
"mode": "grounded",
"takeoff_seconds": 300,
"descent_seconds": 300,
"turbo": False,
"descent_target_f": 120,
"ambient_temp_f": 75
},
"scheduler": {
"enabled": True,
"cutoff_times": [
"23:00"
]
},
"safety": {
"max_temp_f": 750,
"spike_threshold_f": 50.0,
"idle_shutoff_minutes": 30,
"watchdog_timeout_s": 10,
"min_temp_f": 0,
"sensor_stale_seconds": 8,
"sensor_stale_delta_f": 0.8,
"stale_output_ratio": 0.65
},
"gpio": {
"relay_pin": 22,
"clk": 27,
"cs": 18,
"do": 17
},
"logging": {
"log_resolution": 1,
"log_directory": "./logs/nail2",
"max_log_lines": 10000
},
"autotune": {
"hysteresis_f": 8.0,
"cycles": 4
}
}
}
}
class Config:
"""Thread-safe configuration manager with file persistence."""
def __init__(self, config_path="config.json"):
self._path = config_path
self._data = copy.deepcopy(DEFAULT_CONFIG)
self._mtime = 0
self.load()
def load(self):
"""Load config from disk, merging with defaults for any missing keys."""
if not os.path.exists(self._path):
log.info("No config file found at %s, creating with defaults", self._path)
self.save()
return
try:
with open(self._path, 'r') as f:
user_config = json.load(f)
self._data = self._deep_merge(copy.deepcopy(DEFAULT_CONFIG), user_config)
self._mtime = os.path.getmtime(self._path)
log.info("Config loaded from %s", self._path)
except (json.JSONDecodeError, IOError) as e:
log.error("Failed to load config from %s: %s. Using defaults.", self._path, e)
self._data = copy.deepcopy(DEFAULT_CONFIG)
def save(self):
"""Write current config to disk."""
try:
with open(self._path, 'w') as f:
json.dump(self._data, f, indent=2)
self._mtime = os.path.getmtime(self._path)
log.info("Config saved to %s", self._path)
except IOError as e:
log.error("Failed to save config to %s: %s", self._path, e)
def reload_if_changed(self):
"""Reload config from disk if the file has been modified externally."""
try:
current_mtime = os.path.getmtime(self._path)
if current_mtime > self._mtime:
log.info("Config file changed on disk, reloading")
self.load()
return True
except OSError:
pass
return False
def get(self, section, key=None):
"""Get a config value. If key is None, returns the entire section."""
if key is None:
return copy.deepcopy(self._data.get(section, {}))
return self._data.get(section, {}).get(key)
def set(self, section, key, value):
"""Set a config value and save to disk."""
if section not in self._data:
self._data[section] = {}
self._data[section][key] = value
self.save()
def update_section(self, section, values):
"""Update multiple keys in a section and save to disk."""
if section not in self._data:
self._data[section] = {}
self._data[section].update(values)
self.save()
@property
def data(self):
"""Return a deep copy of the full config dict."""
return copy.deepcopy(self._data)
@staticmethod
def _deep_merge(base, override):
"""Recursively merge override into base. Override values win."""
result = base.copy()
for key, value in override.items():
if key in result and isinstance(result[key], dict) and isinstance(value, dict):
result[key] = Config._deep_merge(result[key], value)
else:
result[key] = value
return result