diff options
Diffstat (limited to 'service/src')
| -rw-r--r-- | service/src/cache.rs | 63 | ||||
| -rw-r--r-- | service/src/config.rs | 244 | ||||
| -rw-r--r-- | service/src/dbus.rs | 667 | ||||
| -rw-r--r-- | service/src/lib.rs | 4 | ||||
| -rw-r--r-- | service/src/main.rs | 238 | ||||
| -rw-r--r-- | service/src/victron_mqtt.rs | 576 |
6 files changed, 1792 insertions, 0 deletions
diff --git a/service/src/cache.rs b/service/src/cache.rs new file mode 100644 index 0000000..acaf9d3 --- /dev/null +++ b/service/src/cache.rs @@ -0,0 +1,63 @@ +use std::{ + path::PathBuf, + time::{Duration, SystemTime}, +}; + +use anyhow::{Result, anyhow}; + +#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] +struct SerialCacheEntry { + serial: String, + expires_at_unix: u64, +} + +fn cache_file_path() -> Result<PathBuf> { + let proj_dir = dirs::cache_dir() + .ok_or_else(|| anyhow!("No cache dir"))? + .join("camper-widget-refresh"); + Ok(proj_dir.join("serial.json")) +} + +pub async fn read_serial_cache() -> Result<Option<String>> { + let path = cache_file_path()?; + if !path.exists() { + return Ok(None); + } + let content = tokio::fs::read_to_string(&path) + .await + .map_err(|e| anyhow!("Failed to read cache file at {}: {}", path.display(), e))?; + let entry: SerialCacheEntry = serde_json::from_str(&content)?; + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_secs(); + if now <= entry.expires_at_unix { + Ok(Some(entry.serial)) + } else { + Ok(None) + } +} + +pub async fn write_serial_cache(serial: &str, ttl: Duration) -> Result<()> { + let path = cache_file_path()?; + if let Some(parent) = path.parent() { + tokio::fs::create_dir_all(parent).await.map_err(|e| { + anyhow!( + "Failed to create cache directory {}: {}", + parent.display(), + e + ) + })?; + } + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_secs(); + let entry = SerialCacheEntry { + serial: serial.to_string(), + expires_at_unix: now + ttl.as_secs(), + }; + let data = serde_json::to_string(&entry)?; + tokio::fs::write(&path, data) + .await + .map_err(|e| anyhow!("Failed to write cache file at {}: {}", path.display(), e))?; + Ok(()) +} diff --git a/service/src/config.rs b/service/src/config.rs new file mode 100644 index 0000000..dd98ec7 --- /dev/null +++ b/service/src/config.rs @@ -0,0 +1,244 @@ +use anyhow::Result; +use configparser::ini::Ini; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::RwLock; +use tokio_util::sync::CancellationToken; +use tracing::{debug, info, warn}; +use zbus::Connection; + +#[derive(Clone, Debug)] +pub struct Config { + pub endpoints: Vec<(String, u16)>, + pub username: Option<String>, + pub password: Option<String>, + pub client_id: String, + pub refresh_interval_seconds: u64, +} + +fn default_port() -> u16 { + 1883 +} +fn default_client_id() -> String { + "camper-widget-refresh".to_string() +} +fn default_refresh_interval() -> u64 { + 60 +} +// log level is now controlled via RUST_LOG in main; no per-config default needed + +fn plasma_applet_src_path() -> Option<PathBuf> { + dirs::config_dir().map(|d| d.join("plasma-org.kde.plasma.desktop-appletsrc")) +} + +fn find_applet_section_id(ini: &Ini) -> Option<String> { + for section in ini.sections() { + if let Some(plugin) = ini.get(§ion, "plugin") + && plugin == "craftknight.camper_widget" + { + return Some(section); + } + } + warn!("No applet section id found"); + None +} + +fn unescape_kde_value(s: &str) -> String { + s.replace("\\s", " ") + .replace("\\t", "\t") + .replace("\\n", "\n") + .replace("\\\\", "\\") +} + +fn parse_mqtt_hosts(raw: &str) -> Vec<(String, u16)> { + let unescaped = unescape_kde_value(raw); + let mut out: Vec<(String, u16)> = Vec::new(); + for entry in unescaped.split(',') { + let part = entry.trim(); + if part.is_empty() { + continue; + } + if let Some((host, port_str)) = part.rsplit_once(':') + && let Ok(port) = port_str.parse::<u16>() + { + out.push((host.trim().to_string(), port)); + continue; + } + out.push((part.to_string(), default_port())); + } + if out.is_empty() { + out.push(("127.0.0.1".to_string(), default_port())); + } + out +} + +pub async fn load_config() -> Config { + // Defaults + let mut endpoints: Vec<(String, u16)> = vec![("127.0.0.1".to_string(), default_port())]; + let mut username: Option<String> = None; + let mut password: Option<String> = None; + let client_id = default_client_id(); + let mut refresh_interval_seconds = default_refresh_interval(); + + if let Some(path) = plasma_applet_src_path() { + debug!("Reading Plasma INI config from {:?}", path); + match tokio::fs::read_to_string(&path).await { + Ok(contents) => { + let mut ini = Ini::new(); + if let Err(e) = ini.read(contents) { + warn!("Failed to parse Plasma INI config at {:?}: {}", path, e); + } + let sections = ini.sections(); + debug!("Found {} INI sections", sections.len()); + if let Some(base) = find_applet_section_id(&ini) { + debug!("Matched applet section id: {}", base); + // Build the [Configuration][General] section name from the found applet base + let sec_general = format!("{}][configuration][general", base); + if ini.sections().contains(&sec_general) { + debug!("Using section: {}", sec_general); + debug!("Looking under primary section: {}", sec_general); + let mut read_any = false; + let section = &sec_general; + debug!("Attempt reading values from section: {}", section); + if let Some(raw_hosts) = ini.get(section, "mqttHosts") { + debug!("Found mqttHosts='{}'", raw_hosts); + endpoints = parse_mqtt_hosts(&raw_hosts); + debug!("Parsed endpoints: {:?}", endpoints); + read_any = true; + } + if let Some(u) = ini.get(section, "mqttUsername") + && !u.is_empty() + { + username = Some(u); + debug!("Found mqttUsername set"); + read_any = true; + } + if let Some(p) = ini.get(section, "mqttPassword") + && !p.is_empty() + { + password = Some(p); + debug!("Found mqttPassword set (redacted)"); + read_any = true; + } + if let Some(sec) = ini.get(section, "refreshIntervalSeconds") + && let Ok(v) = sec.trim().parse::<u64>() + { + refresh_interval_seconds = v; + debug!("Found refreshIntervalSeconds={}", v); + read_any = true; + } + if !read_any { + warn!("No configuration keys found under {}", section); + } + } else { + warn!("Could not find target configuration section"); + } + } else { + warn!("No applet section id found"); + } + } + Err(e) => { + warn!("Failed to read Plasma INI config at {:?}: {}", path, e); + } + } + } else { + warn!("No config dir found (dirs::config_dir returned None)"); + } + + Config { + endpoints, + username, + password, + client_id, + refresh_interval_seconds, + } +} + +pub async fn listen_for_config_changes( + _conn: &Connection, + config_state: Arc<RwLock<Config>>, + shared_state: &crate::dbus::SharedState, + token: CancellationToken, +) -> Result<()> { + info!("Starting config reload monitor"); + + loop { + tokio::select! { + _ = shared_state.config_reload_notify.notified() => {} + _ = token.cancelled() => { + info!("Config reload monitor shutting down"); + return Ok(()); + } + } + + info!("Config reload triggered, reloading configuration"); + + let new_config = load_config().await; + + { + let mut config_guard = config_state.write().await; + *config_guard = new_config; + } + + info!("Configuration reloaded successfully"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_mqtt_hosts_various() { + assert_eq!( + parse_mqtt_hosts("127.0.0.1"), + vec![("127.0.0.1".to_string(), 1883)] + ); + assert_eq!( + parse_mqtt_hosts("127.0.0.1:1883"), + vec![("127.0.0.1".to_string(), 1883)] + ); + assert_eq!( + parse_mqtt_hosts("127.0.0.1,192.168.1.111"), + vec![ + ("127.0.0.1".to_string(), 1883), + ("192.168.1.111".to_string(), 1883) + ] + ); + assert_eq!( + parse_mqtt_hosts("127.0.0.1:1883,192.168.1.111"), + vec![ + ("127.0.0.1".to_string(), 1883), + ("192.168.1.111".to_string(), 1883) + ] + ); + assert_eq!( + parse_mqtt_hosts("127.0.0.1:1883,192.168.1.111:1833"), + vec![ + ("127.0.0.1".to_string(), 1883), + ("192.168.1.111".to_string(), 1833) + ] + ); + // KDE config escapes leading spaces as \s + assert_eq!( + parse_mqtt_hosts("\\s192.168.10.202 :1883"), + vec![("192.168.10.202".to_string(), 1883)] + ); + } + + #[test] + fn test_find_applet_section_id() { + let mut ini = Ini::new(); + // Base section like [Containments][85][Applets][133] + ini.set( + "Containments][85][Applets][133", + "plugin", + Some("craftknight.camper_widget".to_string()), + ); + let found = find_applet_section_id(&ini); + // configparser normalizes section names to lowercase + assert_eq!(found, Some("containments][85][applets][133".to_string())); + } + + // We avoid testing load_config() directly because it depends on user's plasma file. +} diff --git a/service/src/dbus.rs b/service/src/dbus.rs new file mode 100644 index 0000000..2a07134 --- /dev/null +++ b/service/src/dbus.rs @@ -0,0 +1,667 @@ +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::{Notify, RwLock, Semaphore}; + +use tracing::{error, warn}; +use zbus::{Connection, connection, interface}; + +// Object paths +pub const ROOT_OBJECT_PATH: &str = "/org/craftknight/camper_widget"; +pub const BATTERY_OBJECT_PATH: &str = "/org/craftknight/camper_widget/Battery"; +pub const SOLAR_OBJECT_PATH: &str = "/org/craftknight/camper_widget/Solar"; +pub const AC_INPUT_OBJECT_PATH: &str = "/org/craftknight/camper_widget/AcInput"; +pub const AC_LOAD_OBJECT_PATH: &str = "/org/craftknight/camper_widget/AcLoad"; + +#[derive(Clone)] +pub struct SharedState { + // Status interface data + pub connected: Arc<RwLock<bool>>, + + // Battery interface data + pub battery_soc: Arc<RwLock<f64>>, + pub battery_power: Arc<RwLock<f64>>, + pub battery_state: Arc<RwLock<String>>, + + // Solar interface data + pub solar_power: Arc<RwLock<f64>>, + + // AC input interface data + pub ac_input_power: Arc<RwLock<f64>>, + + // AC load interface data + pub ac_load_power: Arc<RwLock<f64>>, + + // Config reload notification + pub config_reload_notify: Arc<Notify>, + + // Backpressure for D-Bus signal emission + pub signal_semaphore: Arc<Semaphore>, +} + +impl Default for SharedState { + fn default() -> Self { + Self { + connected: Arc::new(RwLock::new(false)), + battery_soc: Arc::new(RwLock::new(0.0)), + battery_power: Arc::new(RwLock::new(0.0)), + battery_state: Arc::new(RwLock::new("idle".to_string())), + solar_power: Arc::new(RwLock::new(0.0)), + ac_input_power: Arc::new(RwLock::new(0.0)), + ac_load_power: Arc::new(RwLock::new(0.0)), + config_reload_notify: Arc::new(Notify::new()), + signal_semaphore: Arc::new(Semaphore::new(10)), + } + } +} + +#[derive(Clone)] +pub struct StatusInterface { + state: SharedState, +} + +impl StatusInterface { + fn new(state: SharedState) -> Self { + Self { state } + } +} + +#[derive(Clone)] +pub struct BatteryInterface { + state: SharedState, +} + +impl BatteryInterface { + fn new(state: SharedState) -> Self { + Self { state } + } +} + +#[derive(Clone)] +pub struct SolarInterface { + state: SharedState, +} + +impl SolarInterface { + fn new(state: SharedState) -> Self { + Self { state } + } +} + +#[derive(Clone)] +pub struct AcInputInterface { + state: SharedState, +} + +impl AcInputInterface { + fn new(state: SharedState) -> Self { + Self { state } + } +} + +#[derive(Clone)] +pub struct AcLoadInterface { + state: SharedState, +} + +impl AcLoadInterface { + fn new(state: SharedState) -> Self { + Self { state } + } +} + +#[derive(Clone)] +pub struct ConfigInterface { + #[allow(dead_code)] + state: SharedState, +} + +impl ConfigInterface { + fn new(state: SharedState) -> Self { + Self { state } + } +} + +// Status interface - Connected property +#[interface(name = "org.craftknight.CamperWidget.Status")] +impl StatusInterface { + #[zbus(property(emits_changed_signal = "true"))] + async fn connected(&self) -> bool { + *self.state.connected.read().await + } +} + +// Battery interface - SoC, Power, State properties +#[interface(name = "org.craftknight.CamperWidget.Battery")] +impl BatteryInterface { + #[zbus(property(emits_changed_signal = "true"))] + async fn soc(&self) -> f64 { + *self.state.battery_soc.read().await + } + + #[zbus(property(emits_changed_signal = "true"))] + async fn power(&self) -> f64 { + *self.state.battery_power.read().await + } + + #[zbus(property(emits_changed_signal = "true"))] + async fn state(&self) -> String { + self.state.battery_state.read().await.clone() + } +} + +// Solar interface - Power property +#[interface(name = "org.craftknight.CamperWidget.Solar")] +impl SolarInterface { + #[zbus(property(emits_changed_signal = "true"))] + async fn power(&self) -> f64 { + *self.state.solar_power.read().await + } +} + +// AC input interface - Power property +#[interface(name = "org.craftknight.CamperWidget.AcInput")] +impl AcInputInterface { + #[zbus(property(emits_changed_signal = "true"))] + async fn power(&self) -> f64 { + *self.state.ac_input_power.read().await + } +} + +// AC load interface - Power property +#[interface(name = "org.craftknight.CamperWidget.AcLoad")] +impl AcLoadInterface { + #[zbus(property(emits_changed_signal = "true"))] + async fn power(&self) -> f64 { + *self.state.ac_load_power.read().await + } +} + +// Config interface for handling configuration reloads +#[interface(name = "org.craftknight.CamperWidget.Config")] +impl ConfigInterface { + /// Reload configuration method + async fn reload(&self) -> zbus::fdo::Result<()> { + self.state.config_reload_notify.notify_one(); + Ok(()) + } +} + +// Start a session bus service and return the connection; keep it alive in main +pub async fn start_service() -> Result<(Connection, SharedState)> { + let shared = SharedState::default(); + + let status_server = StatusInterface::new(shared.clone()); + let battery_server = BatteryInterface::new(shared.clone()); + let solar_server = SolarInterface::new(shared.clone()); + let ac_input_server = AcInputInterface::new(shared.clone()); + let ac_load_server = AcLoadInterface::new(shared.clone()); + let config_server = ConfigInterface::new(shared.clone()); + + let conn = connection::Builder::session()? + .name("org.craftknight.CamperWidget")? + .serve_at(ROOT_OBJECT_PATH, status_server)? + .serve_at(ROOT_OBJECT_PATH, config_server)? + .serve_at(BATTERY_OBJECT_PATH, battery_server)? + .serve_at(SOLAR_OBJECT_PATH, solar_server)? + .serve_at(AC_INPUT_OBJECT_PATH, ac_input_server)? + .serve_at(AC_LOAD_OBJECT_PATH, ac_load_server)? + .build() + .await?; + + Ok((conn, shared)) +} + +/// Start a D-Bus service at a custom address (for integration testing). +pub async fn start_service_at(address: &str) -> Result<(Connection, SharedState)> { + let shared = SharedState::default(); + + let status_server = StatusInterface::new(shared.clone()); + let battery_server = BatteryInterface::new(shared.clone()); + let solar_server = SolarInterface::new(shared.clone()); + let ac_input_server = AcInputInterface::new(shared.clone()); + let ac_load_server = AcLoadInterface::new(shared.clone()); + let config_server = ConfigInterface::new(shared.clone()); + + let conn = connection::Builder::address(address)? + .name("org.craftknight.CamperWidget")? + .serve_at(ROOT_OBJECT_PATH, status_server)? + .serve_at(ROOT_OBJECT_PATH, config_server)? + .serve_at(BATTERY_OBJECT_PATH, battery_server)? + .serve_at(SOLAR_OBJECT_PATH, solar_server)? + .serve_at(AC_INPUT_OBJECT_PATH, ac_input_server)? + .serve_at(AC_LOAD_OBJECT_PATH, ac_load_server)? + .build() + .await?; + + Ok((conn, shared)) +} + +// Property types for individual updates +#[derive(Debug, Clone, PartialEq)] +pub enum PropertyType { + BatterySoc, + BatteryPower, + BatteryState, + SolarPower, + AcInputPower, + AcLoadPower, +} + +// Update connected status and emit signal +pub async fn update_connected_status( + conn: &Connection, + state: &SharedState, + connected: bool, +) -> Result<()> { + let object_server = conn.object_server(); + + let mut guard = state.connected.write().await; + if *guard != connected { + *guard = connected; + // Emit signal for connected status asynchronously + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, StatusInterface>(ROOT_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = iface.connected_changed(iface_ref.signal_emitter()).await { + error!("Connected: Failed to emit signal: {}", e); + } + } else { + error!("Connected: Failed to get interface at {}", ROOT_OBJECT_PATH); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for Connected"); + } + } + } + + Ok(()) +} + +// Update battery state from a string directly (used by Batteries topic) +pub async fn update_battery_state_str( + conn: &Connection, + state: &SharedState, + state_str: &str, +) -> Result<()> { + let object_server = conn.object_server(); + let mut guard = state.battery_state.write().await; + if *guard != state_str { + *guard = state_str.to_string(); + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = iface.state_changed(iface_ref.signal_emitter()).await { + error!("BatteryState: Failed to emit signal: {}", e); + } + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for BatteryState"); + } + } + } + Ok(()) +} + +// Update a single DBus property immediately +pub async fn update_single_property( + conn: &Connection, + state: &SharedState, + property_type: &PropertyType, + value: f64, +) -> Result<()> { + let object_server = conn.object_server(); + + match property_type { + PropertyType::BatterySoc => { + let mut guard = state.battery_soc.write().await; + if (*guard - value).abs() > f64::EPSILON { + *guard = value; + // Emit signal for battery SoC asynchronously + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = iface.soc_changed(iface_ref.signal_emitter()).await + { + error!("BatterySoc: Failed to emit signal: {}", e); + } + } else { + error!( + "BatterySoc: Failed to get interface at {}", + BATTERY_OBJECT_PATH + ); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for BatterySoc"); + } + } + } + } + PropertyType::BatteryPower => { + let mut guard = state.battery_power.write().await; + if (*guard - value).abs() > f64::EPSILON { + *guard = value; + // Emit signal for battery power asynchronously + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = + iface.power_changed(iface_ref.signal_emitter()).await + { + error!("BatteryPower: Failed to emit signal: {}", e); + } + } else { + error!( + "BatteryPower: Failed to get interface at {}", + BATTERY_OBJECT_PATH + ); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for BatteryPower"); + } + } + } + } + PropertyType::BatteryState => { + let state_str = if value > 5.0 { + "charging".to_string() + } else if value < -5.0 { + "discharging".to_string() + } else { + "idle".to_string() + }; + + let mut guard = state.battery_state.write().await; + if *guard != state_str { + *guard = state_str.clone(); + // Emit signal for battery state asynchronously + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = + iface.state_changed(iface_ref.signal_emitter()).await + { + error!("BatteryState: Failed to emit signal: {}", e); + } + } else { + error!( + "BatteryState: Failed to get interface at {}", + BATTERY_OBJECT_PATH + ); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for BatteryState"); + } + } + } + } + PropertyType::SolarPower => { + let mut guard = state.solar_power.write().await; + if (*guard - value).abs() > f64::EPSILON { + *guard = value; + // Emit signal for solar power asynchronously + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, SolarInterface>(SOLAR_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = + iface.power_changed(iface_ref.signal_emitter()).await + { + error!("SolarPower: Failed to emit signal: {}", e); + } + } else { + error!( + "SolarPower: Failed to get interface at {}", + SOLAR_OBJECT_PATH + ); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for SolarPower"); + } + } + } + } + PropertyType::AcInputPower => { + let mut guard = state.ac_input_power.write().await; + if (*guard - value).abs() > f64::EPSILON { + *guard = value; + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, AcInputInterface>(AC_INPUT_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = + iface.power_changed(iface_ref.signal_emitter()).await + { + error!("AcInputPower: Failed to emit signal: {}", e); + } + } else { + error!( + "AcInputPower: Failed to get interface at {}", + AC_INPUT_OBJECT_PATH + ); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for AcInputPower"); + } + } + } + } + PropertyType::AcLoadPower => { + let mut guard = state.ac_load_power.write().await; + if (*guard - value).abs() > f64::EPSILON { + *guard = value; + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, AcLoadInterface>(AC_LOAD_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = + iface.power_changed(iface_ref.signal_emitter()).await + { + error!("AcLoadPower: Failed to emit signal: {}", e); + } + } else { + error!( + "AcLoadPower: Failed to get interface at {}", + AC_LOAD_OBJECT_PATH + ); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for AcLoadPower"); + } + } + } + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_shared_state_initialization() { + let state = SharedState::default(); + assert!(!(*state.connected.read().await)); + assert_eq!(*state.battery_soc.read().await, 0.0); + assert_eq!(*state.battery_power.read().await, 0.0); + assert_eq!(*state.battery_state.read().await, "idle"); + assert_eq!(*state.solar_power.read().await, 0.0); + assert_eq!(*state.ac_input_power.read().await, 0.0); + assert_eq!(*state.ac_load_power.read().await, 0.0); + } + + #[tokio::test] + async fn test_property_updates() { + let state = SharedState::default(); + + // Test connected status update + { + let mut guard = state.connected.write().await; + *guard = true; + } + assert!(*state.connected.read().await); + + // Test battery property updates + { + let mut soc_guard = state.battery_soc.write().await; + *soc_guard = 75.0; + } + { + let mut power_guard = state.battery_power.write().await; + *power_guard = -15.0; + } + { + let mut state_guard = state.battery_state.write().await; + *state_guard = "discharging".to_string(); + } + + assert_eq!(*state.battery_soc.read().await, 75.0); + assert_eq!(*state.battery_power.read().await, -15.0); + assert_eq!(*state.battery_state.read().await, "discharging"); + + // Test solar power update + { + let mut solar_guard = state.solar_power.write().await; + *solar_guard = 120.0; + } + assert_eq!(*state.solar_power.read().await, 120.0); + + // Test AC input power update + { + let mut ac_input_guard = state.ac_input_power.write().await; + *ac_input_guard = 82.0; + } + assert_eq!(*state.ac_input_power.read().await, 82.0); + + // Test AC load power update + { + let mut ac_load_guard = state.ac_load_power.write().await; + *ac_load_guard = 77.0; + } + assert_eq!(*state.ac_load_power.read().await, 77.0); + } + + #[tokio::test] + async fn test_interface_creation() { + let state = SharedState::default(); + + let _status_interface = StatusInterface::new(state.clone()); + let _battery_interface = BatteryInterface::new(state.clone()); + let _solar_interface = SolarInterface::new(state.clone()); + let _ac_input_interface = AcInputInterface::new(state.clone()); + let _ac_load_interface = AcLoadInterface::new(state.clone()); + let _config_interface = ConfigInterface::new(state.clone()); + + // Test that interfaces can be created without panicking + // If we get here, creation succeeded + } + + #[test] + fn test_property_change_detection() { + // Test floating point comparison logic without async + let value1: f64 = 50.0; + let value2: f64 = 50.1; + + assert!((value1 - 50.0_f64).abs() < f64::EPSILON); + assert!((value2 - 50.1_f64).abs() < f64::EPSILON); + assert!((value1 - value2).abs() > f64::EPSILON); + } + + #[tokio::test] + async fn test_battery_state_mapping() { + // Test the battery state mapping logic from main.rs + let test_cases = vec![ + (10.0, "charging"), + (-10.0, "discharging"), + (0.0, "idle"), + (3.0, "idle"), // Below threshold + (-3.0, "idle"), // Above negative threshold + ]; + + for (power, expected_state) in test_cases { + let direction = if power > 5.0 { + "charging".to_string() + } else if power < -5.0 { + "discharging".to_string() + } else { + "idle".to_string() + }; + + assert_eq!( + direction, expected_state, + "Power {power} should map to {expected_state}" + ); + } + } +} diff --git a/service/src/lib.rs b/service/src/lib.rs new file mode 100644 index 0000000..56bf445 --- /dev/null +++ b/service/src/lib.rs @@ -0,0 +1,4 @@ +pub mod cache; +pub mod config; +pub mod dbus; +pub mod victron_mqtt; diff --git a/service/src/main.rs b/service/src/main.rs new file mode 100644 index 0000000..51f7ff7 --- /dev/null +++ b/service/src/main.rs @@ -0,0 +1,238 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use camper_widget_refresh::cache::{read_serial_cache, write_serial_cache}; +use camper_widget_refresh::config::{Config, listen_for_config_changes, load_config}; +use camper_widget_refresh::dbus::{self, start_service, update_connected_status}; +use camper_widget_refresh::victron_mqtt::{ + build_mqtt_options, disconnect_mqtt, pick_first_available, + read_values_and_update_properties_immediately, wait_for_serial, +}; +use rumqttc::AsyncClient; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tokio::time; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; + +const MAX_CONFIG_LISTENER_BACKOFF_SECS: u64 = 30; + +async fn refresh_data( + cfg: &Config, + conn: &zbus::Connection, + dbus_state: &dbus::SharedState, +) -> Result<()> { + let (host, port) = pick_first_available(&cfg.endpoints).await?; + info!("Connecting to MQTT {}:{} as {}", host, port, cfg.client_id); + + // Set connected to true when we found a reachable host + update_connected_status(conn, dbus_state, true).await?; + + let mqttoptions = build_mqtt_options(cfg, &host, port); + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + + // Try cache for serial first + let mut serial: Option<String> = match read_serial_cache().await { + Ok(Some(s)) => { + info!("Using cached serial: {}", s); + Some(s) + } + Ok(None) => None, + Err(e) => { + warn!("Failed reading cache: {}", e); + None + } + }; + + if serial.is_none() { + info!("Waiting for serial on N/+/system/0/Serial..."); + let s = wait_for_serial(&client, &mut eventloop).await?; + // Long TTL 30 days + if let Err(e) = write_serial_cache(&s, Duration::from_secs(30 * 24 * 3600)).await { + warn!("Failed to write cache: {}", e); + } + serial = Some(s); + } + + let serial = serial.expect("serial should be set"); + + // Read values and update DBus properties immediately as each MQTT message arrives + let result = read_values_and_update_properties_immediately( + &client, + &mut eventloop, + &serial, + conn, + dbus_state, + ) + .await; + + disconnect_mqtt(&client).await; + + match result? { + true => { + info!("Completed refresh cycle - all properties updated"); + } + false => { + warn!("Refresh cycle completed with partial data - some MQTT topics timed out"); + } + } + + Ok(()) +} + +fn spawn_config_listener( + conn: zbus::Connection, + config_state: Arc<RwLock<Config>>, + shared_state: dbus::SharedState, + token: CancellationToken, +) -> JoinHandle<()> { + tokio::spawn(async move { + if let Err(e) = listen_for_config_changes(&conn, config_state, &shared_state, token).await { + error!("Config change listener failed: {}", e); + } + }) +} + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize tracing early so debug logs from config loading are visible + let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); + let _ = tracing::subscriber::set_global_default( + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .with_target(false) + .finish(), + ); + + let initial_cfg = load_config().await; + + // Start D-Bus service and keep connection/state alive + let (dbus_conn, dbus_state) = start_service().await?; + + // Create shared config state + let config_state = Arc::new(RwLock::new(initial_cfg)); + + // Graceful shutdown via CancellationToken + let shutdown_token = CancellationToken::new(); + let ctrl_c_token = shutdown_token.clone(); + tokio::spawn(async move { + if let Err(e) = tokio::signal::ctrl_c().await { + error!("Failed to listen for ctrl-c: {}", e); + return; + } + info!("Received shutdown signal, initiating graceful shutdown"); + ctrl_c_token.cancel(); + }); + + info!( + "Starting camper-widget-refresh with refresh interval: {} seconds", + config_state.read().await.refresh_interval_seconds + ); + + // Spawn the config change listener with supervision + let mut config_listener_handle = spawn_config_listener( + dbus_conn.clone(), + config_state.clone(), + dbus_state.clone(), + shutdown_token.child_token(), + ); + let mut config_backoff_secs: u64 = 1; + + // Run initial refresh + { + let cfg = config_state.read().await; + if let Err(e) = refresh_data(&cfg, &dbus_conn, &dbus_state).await { + warn!("Initial refresh failed: {}", e); + // Set connected to false when refresh fails + if let Err(update_err) = update_connected_status(&dbus_conn, &dbus_state, false).await { + warn!("Failed to update connected status: {}", update_err); + } + } + } + + // Run refresh loop with dynamic interval + while !shutdown_token.is_cancelled() { + // Supervise config listener — respawn with backoff if it died + if config_listener_handle.is_finished() && !shutdown_token.is_cancelled() { + error!( + "Config listener exited unexpectedly, respawning in {}s", + config_backoff_secs + ); + time::sleep(Duration::from_secs(config_backoff_secs)).await; + config_backoff_secs = (config_backoff_secs * 2).min(MAX_CONFIG_LISTENER_BACKOFF_SECS); + config_listener_handle = spawn_config_listener( + dbus_conn.clone(), + config_state.clone(), + dbus_state.clone(), + shutdown_token.child_token(), + ); + } + + let refresh_interval = { + let cfg = config_state.read().await; + Duration::from_secs(cfg.refresh_interval_seconds) + }; + + // Race sleep against shutdown + tokio::select! { + _ = time::sleep(refresh_interval) => {} + _ = shutdown_token.cancelled() => break, + } + + let cfg = config_state.read().await; + match refresh_data(&cfg, &dbus_conn, &dbus_state).await { + Ok(()) => { + // Successful refresh — reset config listener backoff + config_backoff_secs = 1; + } + Err(e) => { + warn!("Refresh failed: {}", e); + if let Err(update_err) = + update_connected_status(&dbus_conn, &dbus_state, false).await + { + warn!("Failed to update connected status: {}", update_err); + } + } + } + } + + info!("Shutting down cleanly"); + Ok(()) +} + +#[cfg(test)] +mod tests { + + // parse_numeric_value tests moved to victron_mqtt.rs + + #[test] + fn test_battery_direction_mapping() { + // Test the direction mapping logic + let test_cases = vec![ + (10.0, "charging"), + (-10.0, "discharging"), + (0.0, "idle"), + (3.0, "idle"), // Below threshold + (-3.0, "idle"), // Above negative threshold + (5.1, "charging"), // Just above threshold + (-5.1, "discharging"), // Just below negative threshold + ]; + + for (power, expected) in test_cases { + let direction = if power > 5.0 { + "charging".to_string() + } else if power < -5.0 { + "discharging".to_string() + } else { + "idle".to_string() + }; + + assert_eq!( + direction, expected, + "Power {power} should map to {expected}" + ); + } + } +} diff --git a/service/src/victron_mqtt.rs b/service/src/victron_mqtt.rs new file mode 100644 index 0000000..d72ddb3 --- /dev/null +++ b/service/src/victron_mqtt.rs @@ -0,0 +1,576 @@ +use std::time::Duration; + +use crate::config::Config; +use crate::dbus::{PropertyType, SharedState, update_battery_state_str, update_single_property}; +use anyhow::{Result, anyhow}; +use rumqttc::{AsyncClient, Event, EventLoop, Incoming, MqttOptions, QoS}; +use tokio::{net::TcpStream, select, time}; +use tracing::{debug, info, warn}; +use zbus::Connection; + +// Topic-to-property mapping structure (for simple 1:1 topic→property mappings) +#[derive(Debug, Clone)] +pub struct TopicPropertyMapping { + pub topic_pattern: String, + pub property_type: PropertyType, +} + +impl TopicPropertyMapping { + pub fn new(topic_pattern: String, property_type: PropertyType) -> Self { + Self { + topic_pattern, + property_type, + } + } +} + +// Collection of all topic mappings for a given serial +pub struct MqttPropertyMappings { + mappings: Vec<TopicPropertyMapping>, + batteries_topic: String, // system/0/Batteries — yields SoC, Power, State +} + +impl MqttPropertyMappings { + pub fn new(serial: &str) -> Self { + Self { + mappings: vec![ + TopicPropertyMapping::new( + format!("N/{serial}/solarcharger/277/Yield/Power"), + PropertyType::SolarPower, + ), + TopicPropertyMapping::new( + format!("N/{serial}/system/0/Ac/Grid/L1/Power"), + PropertyType::AcInputPower, + ), + TopicPropertyMapping::new( + format!("N/{serial}/system/0/Ac/Consumption/L1/Power"), + PropertyType::AcLoadPower, + ), + ], + batteries_topic: format!("N/{serial}/system/0/Batteries"), + } + } + + // Find mapping for a given topic (simple 1:1 mappings only) + pub fn find_mapping(&self, topic: &str) -> Option<&TopicPropertyMapping> { + self.mappings.iter().find(|m| m.topic_pattern == topic) + } + + // Check if topic is the composite Batteries topic + pub fn is_batteries_topic(&self, topic: &str) -> bool { + self.batteries_topic == topic + } + + // Get all topic patterns for subscription + pub fn get_subscription_topics(&self) -> Vec<&str> { + let mut topics: Vec<&str> = self + .mappings + .iter() + .map(|m| m.topic_pattern.as_str()) + .collect(); + topics.push(&self.batteries_topic); + topics + } +} + +/// Parse the system/0/Batteries JSON payload. +/// Returns (soc, power, state_string) from the first battery entry. +pub fn parse_batteries_payload(payload: &str) -> Option<(f64, f64, String)> { + let json: serde_json::Value = serde_json::from_str(payload).ok()?; + let batteries = json.get("value")?.as_array()?; + let battery = batteries.first()?; + let soc = battery.get("soc")?.as_f64()?; + let power = battery.get("power")?.as_f64()?; + let state_num = battery.get("state")?.as_i64().unwrap_or(0); + let state_str = match state_num { + 1 => "charging", + 2 => "discharging", + _ => "idle", + }; + Some((soc, power, state_str.to_string())) +} + +// Parse numeric value from MQTT payload (moved from main.rs) +fn parse_numeric_value(s: Option<&String>) -> f64 { + if let Some(raw) = s { + // First try direct number parsing + if let Ok(v) = raw.trim().parse::<f64>() { + return v; + } + // Try JSON object with {"value": number} + if let Ok(val) = serde_json::from_str::<serde_json::Value>(raw) + && let Some(v) = val.get("value") + { + if let Some(n) = v.as_f64() { + return n; + } + if let Some(s) = v.as_str() + && let Ok(n) = s.trim().parse::<f64>() + { + return n; + } + } + } + 0.0 +} + +// Returns Some(serial) when topic is like "N/{serial}/system/0/Serial" +pub fn extract_serial_from_topic(topic: &str) -> Option<String> { + if topic.starts_with("N/") && topic.ends_with("/system/0/Serial") { + let parts: Vec<&str> = topic.split('/').collect(); + if parts.len() >= 3 { + return Some(parts[1].to_string()); + } + } + None +} + +pub async fn disconnect_mqtt(client: &AsyncClient) { + if let Err(e) = client.disconnect().await { + warn!("MQTT disconnect failed: {}", e); + } +} + +pub fn build_mqtt_options(cfg: &Config, host: &str, port: u16) -> MqttOptions { + let mut opts = MqttOptions::new(&cfg.client_id, host, port); + opts.set_keep_alive(Duration::from_secs(10)); + if let (Some(u), Some(p)) = (&cfg.username, &cfg.password) { + opts.set_credentials(u, p); + } + opts +} + +pub async fn pick_first_available(endpoints: &[(String, u16)]) -> Result<(String, u16)> { + for (host, port) in endpoints { + let addr = (host.as_str(), *port); + let attempt = time::timeout(Duration::from_secs(2), TcpStream::connect(addr)).await; + match attempt { + Ok(Ok(stream)) => { + // Successfully connected; close and return this endpoint + drop(stream); + return Ok((host.clone(), *port)); + } + _ => { + // Try next + } + } + } + Err(anyhow!("No reachable MQTT endpoint from config")) +} + +pub async fn wait_for_serial(client: &AsyncClient, eventloop: &mut EventLoop) -> Result<String> { + // Subscribe to N/+/system/0/Serial + client + .subscribe("N/+/system/0/Serial", QoS::AtLeastOnce) + .await?; + loop { + match eventloop.poll().await? { + Event::Incoming(Incoming::Publish(p)) => { + let topic = p.topic.clone(); + if let Some(serial) = extract_serial_from_topic(&topic) { + info!("Detected serial from topic: {}", serial); + return Ok(serial); + } + } + _ => { + // Ignore other events + } + } + } +} + +/// Read values and update DBus properties immediately as each message arrives. +/// Returns `Ok(true)` if all topics were received, `Ok(false)` on timeout with partial data. +pub async fn read_values_and_update_properties_immediately( + client: &AsyncClient, + eventloop: &mut EventLoop, + serial: &str, + conn: &Connection, + dbus_state: &SharedState, +) -> Result<bool> { + let mappings = MqttPropertyMappings::new(serial); + let topics = mappings.get_subscription_topics(); + + // Subscribe to all topics + for topic in &topics { + client.subscribe(*topic, QoS::AtLeastOnce).await?; + } + debug!( + "Subscribed to {} topics for serial={}", + topics.len(), + serial + ); + + let mut received_topics = std::collections::HashSet::new(); + let total_topics = topics.len(); + + // We'll wait up to 30 seconds for all values + let timeout = time::sleep(Duration::from_secs(30)); + tokio::pin!(timeout); + + let all_received = loop { + select! { + _ = &mut timeout => { + warn!("Timeout while waiting for all values, received {} of {}", received_topics.len(), total_topics); + break false; + } + ev = eventloop.poll() => { + match ev? { + Event::Incoming(Incoming::Publish(p)) => { + let topic = p.topic.clone(); + let payload_str = String::from_utf8_lossy(&p.payload).to_string(); + + if let Some(mapping) = mappings.find_mapping(&topic) { + let value = parse_numeric_value(Some(&payload_str)); + update_single_property(conn, dbus_state, &mapping.property_type, value).await?; + received_topics.insert(topic.clone()); + info!("Updated {:?} with value {} ({} of {} topics received)", + mapping.property_type, value, received_topics.len(), total_topics); + } else if mappings.is_batteries_topic(&topic) + && let Some((soc, power, state_str)) = parse_batteries_payload(&payload_str) + { + update_single_property(conn, dbus_state, &PropertyType::BatterySoc, soc).await?; + update_single_property(conn, dbus_state, &PropertyType::BatteryPower, power).await?; + update_battery_state_str(conn, dbus_state, &state_str).await?; + received_topics.insert(topic.clone()); + info!("Updated battery from Batteries topic: soc={}, power={}, state={} ({} of {} topics received)", + soc, power, state_str, received_topics.len(), total_topics); + } + + if received_topics.len() >= total_topics { + info!("All {} properties updated successfully", total_topics); + break true; + } + } + _ => { + // Ignore other events (ConnAck, SubAck, etc.) + } + } + } + } + }; + + Ok(all_received) +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::net::TcpListener; + + #[test] + fn test_extract_serial_from_topic() { + assert_eq!( + extract_serial_from_topic("N/abc123/system/0/Serial"), + Some("abc123".to_string()) + ); + assert_eq!( + extract_serial_from_topic("N/xyz/system/0/Serial"), + Some("xyz".to_string()) + ); + assert_eq!( + extract_serial_from_topic("N//system/0/Serial"), + Some("".to_string()) + ); + assert_eq!(extract_serial_from_topic("N/abc123/system/0/Other"), None); + assert_eq!(extract_serial_from_topic("foo/bar"), None); + } + + #[tokio::test] + async fn test_pick_first_available_picks_open_second() { + // Start a listener for the second endpoint only + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let endpoints = vec![ + ("127.0.0.1".to_string(), 1), // very likely closed + (addr.ip().to_string(), addr.port()), + ]; + + let chosen = pick_first_available(&endpoints).await.unwrap(); + assert_eq!(chosen, (addr.ip().to_string(), addr.port())); + + drop(listener); + } + + #[tokio::test] + async fn test_pick_first_available_prefers_first_when_both_open() { + let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr1 = listener1.local_addr().unwrap(); + let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr2 = listener2.local_addr().unwrap(); + + let endpoints = vec![ + (addr1.ip().to_string(), addr1.port()), + (addr2.ip().to_string(), addr2.port()), + ]; + + let chosen = pick_first_available(&endpoints).await.unwrap(); + assert_eq!(chosen, (addr1.ip().to_string(), addr1.port())); + + drop(listener1); + drop(listener2); + } + + #[test] + fn test_parse_numeric_value_direct() { + let value = "123.45".to_string(); + let input = Some(&value); + assert_eq!(parse_numeric_value(input), 123.45); + } + + #[test] + fn test_parse_numeric_value_json() { + let value = r#"{"value": 67.89}"#.to_string(); + let input = Some(&value); + assert_eq!(parse_numeric_value(input), 67.89); + } + + #[test] + fn test_parse_numeric_value_json_string() { + let value = r#"{"value": "90.12"}"#.to_string(); + let input = Some(&value); + assert_eq!(parse_numeric_value(input), 90.12); + } + + #[test] + fn test_parse_numeric_value_invalid() { + let value = "invalid".to_string(); + let input = Some(&value); + assert_eq!(parse_numeric_value(input), 0.0); + } + + #[test] + fn test_parse_numeric_value_none() { + let input = None; + assert_eq!(parse_numeric_value(input), 0.0); + } + + #[test] + fn test_parse_numeric_value_whitespace() { + let value = " 42.5 ".to_string(); + let input = Some(&value); + assert_eq!(parse_numeric_value(input), 42.5); + } + + #[test] + fn test_topic_property_mapping() { + let mappings = MqttPropertyMappings::new("test123"); + let topics = mappings.get_subscription_topics(); + + assert_eq!(topics.len(), 4); + assert!(topics.contains(&"N/test123/solarcharger/277/Yield/Power")); + assert!(topics.contains(&"N/test123/system/0/Ac/Grid/L1/Power")); + assert!(topics.contains(&"N/test123/system/0/Ac/Consumption/L1/Power")); + assert!(topics.contains(&"N/test123/system/0/Batteries")); + + // Test finding solar mapping + let solar_mapping = mappings.find_mapping("N/test123/solarcharger/277/Yield/Power"); + assert!(solar_mapping.is_some()); + assert_eq!( + solar_mapping.unwrap().property_type, + PropertyType::SolarPower + ); + + // Test finding AC input mapping + let ac_input_mapping = mappings.find_mapping("N/test123/system/0/Ac/Grid/L1/Power"); + assert!(ac_input_mapping.is_some()); + assert_eq!( + ac_input_mapping.unwrap().property_type, + PropertyType::AcInputPower + ); + + // Test finding AC load mapping + let ac_load_mapping = mappings.find_mapping("N/test123/system/0/Ac/Consumption/L1/Power"); + assert!(ac_load_mapping.is_some()); + assert_eq!( + ac_load_mapping.unwrap().property_type, + PropertyType::AcLoadPower + ); + + // Batteries topic is handled specially, not in find_mapping + assert!( + mappings + .find_mapping("N/test123/system/0/Batteries") + .is_none() + ); + assert!(mappings.is_batteries_topic("N/test123/system/0/Batteries")); + } + + // --- parse_numeric_value: fallback chain and design decisions --- + + #[test] + fn test_parse_numeric_value_negative() { + let value = "-123.45".to_string(); + assert_eq!(parse_numeric_value(Some(&value)), -123.45); + } + + #[test] + fn test_parse_numeric_value_json_negative() { + let value = r#"{"value": -67.89}"#.to_string(); + assert_eq!(parse_numeric_value(Some(&value)), -67.89); + } + + #[test] + fn test_parse_numeric_value_json_null() { + let value = r#"{"value": null}"#.to_string(); + assert_eq!(parse_numeric_value(Some(&value)), 0.0); + } + + #[test] + fn test_parse_numeric_value_json_missing_value_key() { + let value = r#"{"power": 42}"#.to_string(); + assert_eq!(parse_numeric_value(Some(&value)), 0.0); + } + + #[test] + fn test_parse_numeric_value_empty_string() { + let value = "".to_string(); + assert_eq!(parse_numeric_value(Some(&value)), 0.0); + } + + #[test] + fn test_parse_numeric_value_json_with_extra_fields() { + let value = r#"{"value": 42, "unit": "W"}"#.to_string(); + assert_eq!(parse_numeric_value(Some(&value)), 42.0); + } + + #[test] + fn test_parse_numeric_value_json_non_numeric_string() { + let value = r#"{"value": "n/a"}"#.to_string(); + assert_eq!(parse_numeric_value(Some(&value)), 0.0); + } + + // --- extract_serial_from_topic: boundary cases --- + + #[test] + fn test_extract_serial_empty_topic() { + assert_eq!(extract_serial_from_topic(""), None); + } + + #[test] + fn test_extract_serial_data_topic_rejected() { + // Data topics must not be mistaken for serial discovery topics + assert_eq!( + extract_serial_from_topic("N/abc/battery/278/Dc/0/Power"), + None + ); + } + + // --- MqttPropertyMappings: Victron device mapping design --- + + #[test] + fn test_mapping_find_returns_none_for_unknown_topic() { + let mappings = MqttPropertyMappings::new("serial1"); + assert!(mappings.find_mapping("N/serial1/unknown/0/Foo").is_none()); + } + + #[test] + fn test_mapping_solar_power_exists() { + let mappings = MqttPropertyMappings::new("s1"); + let solar = mappings + .find_mapping("N/s1/solarcharger/277/Yield/Power") + .expect("solar mapping should exist"); + assert_eq!(solar.property_type, PropertyType::SolarPower); + } + + #[test] + fn test_mapping_different_serials_dont_cross_match() { + let mappings_a = MqttPropertyMappings::new("AAA"); + let mappings_b = MqttPropertyMappings::new("BBB"); + // Simple mappings from serial AAA must not match in serial BBB + for topic in mappings_a.get_subscription_topics() { + assert!( + mappings_b.find_mapping(topic).is_none(), + "serial BBB should not match topic from serial AAA: {topic}" + ); + if mappings_a.is_batteries_topic(topic) { + assert!( + !mappings_b.is_batteries_topic(topic), + "serial BBB batteries topic should not match serial AAA: {topic}" + ); + } + } + } + + #[test] + fn test_parse_batteries_payload_charging() { + let payload = r#"{"value":[{"active_battery_service":true,"current":1.8,"id":"com.victronenergy.battery.ttyS7","instance":279,"name":"Shunt","power":48.13,"soc":67.1,"state":1,"voltage":26.74}]}"#; + let (soc, power, state) = parse_batteries_payload(payload).unwrap(); + assert!((soc - 67.1).abs() < 0.01); + assert!((power - 48.13).abs() < 0.01); + assert_eq!(state, "charging"); + } + + #[test] + fn test_parse_batteries_payload_discharging() { + let payload = r#"{"value":[{"soc":50.0,"power":-334.78,"state":2}]}"#; + let (soc, power, state) = parse_batteries_payload(payload).unwrap(); + assert!((soc - 50.0).abs() < 0.01); + assert!((power - (-334.78)).abs() < 0.01); + assert_eq!(state, "discharging"); + } + + #[test] + fn test_parse_batteries_payload_idle() { + let payload = r#"{"value":[{"soc":100.0,"power":0.0,"state":0}]}"#; + let (soc, _power, state) = parse_batteries_payload(payload).unwrap(); + assert!((soc - 100.0).abs() < 0.01); + assert_eq!(state, "idle"); + } + + #[test] + fn test_parse_batteries_payload_invalid() { + assert!(parse_batteries_payload("not json").is_none()); + assert!(parse_batteries_payload(r#"{"value":[]}"#).is_none()); + assert!(parse_batteries_payload(r#"{"value":"nope"}"#).is_none()); + } + + #[test] + fn test_mapping_topics_embed_serial() { + let serial = "mySerial42"; + let mappings = MqttPropertyMappings::new(serial); + for topic in mappings.get_subscription_topics() { + assert!( + topic.contains(serial), + "topic should contain serial: {topic}" + ); + } + } + + // --- build_mqtt_options: configuration policy --- + + #[test] + fn test_build_mqtt_options_applies_credentials_when_present() { + let cfg = Config { + endpoints: vec![("127.0.0.1".to_string(), 1883)], + username: Some("user".to_string()), + password: Some("pass".to_string()), + client_id: "test-client".to_string(), + refresh_interval_seconds: 60, + }; + let opts = build_mqtt_options(&cfg, "127.0.0.1", 1883); + let login = opts.credentials().expect("credentials should be set"); + assert_eq!(login.username, "user"); + assert_eq!(login.password, "pass"); + assert_eq!(opts.keep_alive(), Duration::from_secs(10)); + } + + #[test] + fn test_build_mqtt_options_skips_credentials_when_absent() { + let cfg = Config { + endpoints: vec![("127.0.0.1".to_string(), 1883)], + username: None, + password: None, + client_id: "anon-client".to_string(), + refresh_interval_seconds: 60, + }; + let opts = build_mqtt_options(&cfg, "127.0.0.1", 1883); + assert!( + opts.credentials().is_none(), + "anonymous mode should have no credentials" + ); + assert_eq!(opts.client_id(), "anon-client"); + assert_eq!(opts.keep_alive(), Duration::from_secs(10)); + } +} |
