summaryrefslogtreecommitdiff
path: root/service/src
diff options
context:
space:
mode:
Diffstat (limited to 'service/src')
-rw-r--r--service/src/cache.rs63
-rw-r--r--service/src/config.rs244
-rw-r--r--service/src/dbus.rs667
-rw-r--r--service/src/lib.rs4
-rw-r--r--service/src/main.rs238
-rw-r--r--service/src/victron_mqtt.rs576
6 files changed, 1792 insertions, 0 deletions
diff --git a/service/src/cache.rs b/service/src/cache.rs
new file mode 100644
index 0000000..acaf9d3
--- /dev/null
+++ b/service/src/cache.rs
@@ -0,0 +1,63 @@
+use std::{
+ path::PathBuf,
+ time::{Duration, SystemTime},
+};
+
+use anyhow::{Result, anyhow};
+
+#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
+struct SerialCacheEntry {
+ serial: String,
+ expires_at_unix: u64,
+}
+
+fn cache_file_path() -> Result<PathBuf> {
+ let proj_dir = dirs::cache_dir()
+ .ok_or_else(|| anyhow!("No cache dir"))?
+ .join("camper-widget-refresh");
+ Ok(proj_dir.join("serial.json"))
+}
+
+pub async fn read_serial_cache() -> Result<Option<String>> {
+ let path = cache_file_path()?;
+ if !path.exists() {
+ return Ok(None);
+ }
+ let content = tokio::fs::read_to_string(&path)
+ .await
+ .map_err(|e| anyhow!("Failed to read cache file at {}: {}", path.display(), e))?;
+ let entry: SerialCacheEntry = serde_json::from_str(&content)?;
+ let now = SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)?
+ .as_secs();
+ if now <= entry.expires_at_unix {
+ Ok(Some(entry.serial))
+ } else {
+ Ok(None)
+ }
+}
+
+pub async fn write_serial_cache(serial: &str, ttl: Duration) -> Result<()> {
+ let path = cache_file_path()?;
+ if let Some(parent) = path.parent() {
+ tokio::fs::create_dir_all(parent).await.map_err(|e| {
+ anyhow!(
+ "Failed to create cache directory {}: {}",
+ parent.display(),
+ e
+ )
+ })?;
+ }
+ let now = SystemTime::now()
+ .duration_since(SystemTime::UNIX_EPOCH)?
+ .as_secs();
+ let entry = SerialCacheEntry {
+ serial: serial.to_string(),
+ expires_at_unix: now + ttl.as_secs(),
+ };
+ let data = serde_json::to_string(&entry)?;
+ tokio::fs::write(&path, data)
+ .await
+ .map_err(|e| anyhow!("Failed to write cache file at {}: {}", path.display(), e))?;
+ Ok(())
+}
diff --git a/service/src/config.rs b/service/src/config.rs
new file mode 100644
index 0000000..dd98ec7
--- /dev/null
+++ b/service/src/config.rs
@@ -0,0 +1,244 @@
+use anyhow::Result;
+use configparser::ini::Ini;
+use std::path::PathBuf;
+use std::sync::Arc;
+use tokio::sync::RwLock;
+use tokio_util::sync::CancellationToken;
+use tracing::{debug, info, warn};
+use zbus::Connection;
+
+#[derive(Clone, Debug)]
+pub struct Config {
+ pub endpoints: Vec<(String, u16)>,
+ pub username: Option<String>,
+ pub password: Option<String>,
+ pub client_id: String,
+ pub refresh_interval_seconds: u64,
+}
+
+fn default_port() -> u16 {
+ 1883
+}
+fn default_client_id() -> String {
+ "camper-widget-refresh".to_string()
+}
+fn default_refresh_interval() -> u64 {
+ 60
+}
+// log level is now controlled via RUST_LOG in main; no per-config default needed
+
+fn plasma_applet_src_path() -> Option<PathBuf> {
+ dirs::config_dir().map(|d| d.join("plasma-org.kde.plasma.desktop-appletsrc"))
+}
+
+fn find_applet_section_id(ini: &Ini) -> Option<String> {
+ for section in ini.sections() {
+ if let Some(plugin) = ini.get(&section, "plugin")
+ && plugin == "craftknight.camper_widget"
+ {
+ return Some(section);
+ }
+ }
+ warn!("No applet section id found");
+ None
+}
+
+fn unescape_kde_value(s: &str) -> String {
+ s.replace("\\s", " ")
+ .replace("\\t", "\t")
+ .replace("\\n", "\n")
+ .replace("\\\\", "\\")
+}
+
+fn parse_mqtt_hosts(raw: &str) -> Vec<(String, u16)> {
+ let unescaped = unescape_kde_value(raw);
+ let mut out: Vec<(String, u16)> = Vec::new();
+ for entry in unescaped.split(',') {
+ let part = entry.trim();
+ if part.is_empty() {
+ continue;
+ }
+ if let Some((host, port_str)) = part.rsplit_once(':')
+ && let Ok(port) = port_str.parse::<u16>()
+ {
+ out.push((host.trim().to_string(), port));
+ continue;
+ }
+ out.push((part.to_string(), default_port()));
+ }
+ if out.is_empty() {
+ out.push(("127.0.0.1".to_string(), default_port()));
+ }
+ out
+}
+
+pub async fn load_config() -> Config {
+ // Defaults
+ let mut endpoints: Vec<(String, u16)> = vec![("127.0.0.1".to_string(), default_port())];
+ let mut username: Option<String> = None;
+ let mut password: Option<String> = None;
+ let client_id = default_client_id();
+ let mut refresh_interval_seconds = default_refresh_interval();
+
+ if let Some(path) = plasma_applet_src_path() {
+ debug!("Reading Plasma INI config from {:?}", path);
+ match tokio::fs::read_to_string(&path).await {
+ Ok(contents) => {
+ let mut ini = Ini::new();
+ if let Err(e) = ini.read(contents) {
+ warn!("Failed to parse Plasma INI config at {:?}: {}", path, e);
+ }
+ let sections = ini.sections();
+ debug!("Found {} INI sections", sections.len());
+ if let Some(base) = find_applet_section_id(&ini) {
+ debug!("Matched applet section id: {}", base);
+ // Build the [Configuration][General] section name from the found applet base
+ let sec_general = format!("{}][configuration][general", base);
+ if ini.sections().contains(&sec_general) {
+ debug!("Using section: {}", sec_general);
+ debug!("Looking under primary section: {}", sec_general);
+ let mut read_any = false;
+ let section = &sec_general;
+ debug!("Attempt reading values from section: {}", section);
+ if let Some(raw_hosts) = ini.get(section, "mqttHosts") {
+ debug!("Found mqttHosts='{}'", raw_hosts);
+ endpoints = parse_mqtt_hosts(&raw_hosts);
+ debug!("Parsed endpoints: {:?}", endpoints);
+ read_any = true;
+ }
+ if let Some(u) = ini.get(section, "mqttUsername")
+ && !u.is_empty()
+ {
+ username = Some(u);
+ debug!("Found mqttUsername set");
+ read_any = true;
+ }
+ if let Some(p) = ini.get(section, "mqttPassword")
+ && !p.is_empty()
+ {
+ password = Some(p);
+ debug!("Found mqttPassword set (redacted)");
+ read_any = true;
+ }
+ if let Some(sec) = ini.get(section, "refreshIntervalSeconds")
+ && let Ok(v) = sec.trim().parse::<u64>()
+ {
+ refresh_interval_seconds = v;
+ debug!("Found refreshIntervalSeconds={}", v);
+ read_any = true;
+ }
+ if !read_any {
+ warn!("No configuration keys found under {}", section);
+ }
+ } else {
+ warn!("Could not find target configuration section");
+ }
+ } else {
+ warn!("No applet section id found");
+ }
+ }
+ Err(e) => {
+ warn!("Failed to read Plasma INI config at {:?}: {}", path, e);
+ }
+ }
+ } else {
+ warn!("No config dir found (dirs::config_dir returned None)");
+ }
+
+ Config {
+ endpoints,
+ username,
+ password,
+ client_id,
+ refresh_interval_seconds,
+ }
+}
+
+pub async fn listen_for_config_changes(
+ _conn: &Connection,
+ config_state: Arc<RwLock<Config>>,
+ shared_state: &crate::dbus::SharedState,
+ token: CancellationToken,
+) -> Result<()> {
+ info!("Starting config reload monitor");
+
+ loop {
+ tokio::select! {
+ _ = shared_state.config_reload_notify.notified() => {}
+ _ = token.cancelled() => {
+ info!("Config reload monitor shutting down");
+ return Ok(());
+ }
+ }
+
+ info!("Config reload triggered, reloading configuration");
+
+ let new_config = load_config().await;
+
+ {
+ let mut config_guard = config_state.write().await;
+ *config_guard = new_config;
+ }
+
+ info!("Configuration reloaded successfully");
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_parse_mqtt_hosts_various() {
+ assert_eq!(
+ parse_mqtt_hosts("127.0.0.1"),
+ vec![("127.0.0.1".to_string(), 1883)]
+ );
+ assert_eq!(
+ parse_mqtt_hosts("127.0.0.1:1883"),
+ vec![("127.0.0.1".to_string(), 1883)]
+ );
+ assert_eq!(
+ parse_mqtt_hosts("127.0.0.1,192.168.1.111"),
+ vec![
+ ("127.0.0.1".to_string(), 1883),
+ ("192.168.1.111".to_string(), 1883)
+ ]
+ );
+ assert_eq!(
+ parse_mqtt_hosts("127.0.0.1:1883,192.168.1.111"),
+ vec![
+ ("127.0.0.1".to_string(), 1883),
+ ("192.168.1.111".to_string(), 1883)
+ ]
+ );
+ assert_eq!(
+ parse_mqtt_hosts("127.0.0.1:1883,192.168.1.111:1833"),
+ vec![
+ ("127.0.0.1".to_string(), 1883),
+ ("192.168.1.111".to_string(), 1833)
+ ]
+ );
+ // KDE config escapes leading spaces as \s
+ assert_eq!(
+ parse_mqtt_hosts("\\s192.168.10.202 :1883"),
+ vec![("192.168.10.202".to_string(), 1883)]
+ );
+ }
+
+ #[test]
+ fn test_find_applet_section_id() {
+ let mut ini = Ini::new();
+ // Base section like [Containments][85][Applets][133]
+ ini.set(
+ "Containments][85][Applets][133",
+ "plugin",
+ Some("craftknight.camper_widget".to_string()),
+ );
+ let found = find_applet_section_id(&ini);
+ // configparser normalizes section names to lowercase
+ assert_eq!(found, Some("containments][85][applets][133".to_string()));
+ }
+
+ // We avoid testing load_config() directly because it depends on user's plasma file.
+}
diff --git a/service/src/dbus.rs b/service/src/dbus.rs
new file mode 100644
index 0000000..2a07134
--- /dev/null
+++ b/service/src/dbus.rs
@@ -0,0 +1,667 @@
+use std::sync::Arc;
+
+use anyhow::Result;
+use tokio::sync::{Notify, RwLock, Semaphore};
+
+use tracing::{error, warn};
+use zbus::{Connection, connection, interface};
+
+// Object paths
+pub const ROOT_OBJECT_PATH: &str = "/org/craftknight/camper_widget";
+pub const BATTERY_OBJECT_PATH: &str = "/org/craftknight/camper_widget/Battery";
+pub const SOLAR_OBJECT_PATH: &str = "/org/craftknight/camper_widget/Solar";
+pub const AC_INPUT_OBJECT_PATH: &str = "/org/craftknight/camper_widget/AcInput";
+pub const AC_LOAD_OBJECT_PATH: &str = "/org/craftknight/camper_widget/AcLoad";
+
+#[derive(Clone)]
+pub struct SharedState {
+ // Status interface data
+ pub connected: Arc<RwLock<bool>>,
+
+ // Battery interface data
+ pub battery_soc: Arc<RwLock<f64>>,
+ pub battery_power: Arc<RwLock<f64>>,
+ pub battery_state: Arc<RwLock<String>>,
+
+ // Solar interface data
+ pub solar_power: Arc<RwLock<f64>>,
+
+ // AC input interface data
+ pub ac_input_power: Arc<RwLock<f64>>,
+
+ // AC load interface data
+ pub ac_load_power: Arc<RwLock<f64>>,
+
+ // Config reload notification
+ pub config_reload_notify: Arc<Notify>,
+
+ // Backpressure for D-Bus signal emission
+ pub signal_semaphore: Arc<Semaphore>,
+}
+
+impl Default for SharedState {
+ fn default() -> Self {
+ Self {
+ connected: Arc::new(RwLock::new(false)),
+ battery_soc: Arc::new(RwLock::new(0.0)),
+ battery_power: Arc::new(RwLock::new(0.0)),
+ battery_state: Arc::new(RwLock::new("idle".to_string())),
+ solar_power: Arc::new(RwLock::new(0.0)),
+ ac_input_power: Arc::new(RwLock::new(0.0)),
+ ac_load_power: Arc::new(RwLock::new(0.0)),
+ config_reload_notify: Arc::new(Notify::new()),
+ signal_semaphore: Arc::new(Semaphore::new(10)),
+ }
+ }
+}
+
+#[derive(Clone)]
+pub struct StatusInterface {
+ state: SharedState,
+}
+
+impl StatusInterface {
+ fn new(state: SharedState) -> Self {
+ Self { state }
+ }
+}
+
+#[derive(Clone)]
+pub struct BatteryInterface {
+ state: SharedState,
+}
+
+impl BatteryInterface {
+ fn new(state: SharedState) -> Self {
+ Self { state }
+ }
+}
+
+#[derive(Clone)]
+pub struct SolarInterface {
+ state: SharedState,
+}
+
+impl SolarInterface {
+ fn new(state: SharedState) -> Self {
+ Self { state }
+ }
+}
+
+#[derive(Clone)]
+pub struct AcInputInterface {
+ state: SharedState,
+}
+
+impl AcInputInterface {
+ fn new(state: SharedState) -> Self {
+ Self { state }
+ }
+}
+
+#[derive(Clone)]
+pub struct AcLoadInterface {
+ state: SharedState,
+}
+
+impl AcLoadInterface {
+ fn new(state: SharedState) -> Self {
+ Self { state }
+ }
+}
+
+#[derive(Clone)]
+pub struct ConfigInterface {
+ #[allow(dead_code)]
+ state: SharedState,
+}
+
+impl ConfigInterface {
+ fn new(state: SharedState) -> Self {
+ Self { state }
+ }
+}
+
+// Status interface - Connected property
+#[interface(name = "org.craftknight.CamperWidget.Status")]
+impl StatusInterface {
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn connected(&self) -> bool {
+ *self.state.connected.read().await
+ }
+}
+
+// Battery interface - SoC, Power, State properties
+#[interface(name = "org.craftknight.CamperWidget.Battery")]
+impl BatteryInterface {
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn soc(&self) -> f64 {
+ *self.state.battery_soc.read().await
+ }
+
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn power(&self) -> f64 {
+ *self.state.battery_power.read().await
+ }
+
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn state(&self) -> String {
+ self.state.battery_state.read().await.clone()
+ }
+}
+
+// Solar interface - Power property
+#[interface(name = "org.craftknight.CamperWidget.Solar")]
+impl SolarInterface {
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn power(&self) -> f64 {
+ *self.state.solar_power.read().await
+ }
+}
+
+// AC input interface - Power property
+#[interface(name = "org.craftknight.CamperWidget.AcInput")]
+impl AcInputInterface {
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn power(&self) -> f64 {
+ *self.state.ac_input_power.read().await
+ }
+}
+
+// AC load interface - Power property
+#[interface(name = "org.craftknight.CamperWidget.AcLoad")]
+impl AcLoadInterface {
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn power(&self) -> f64 {
+ *self.state.ac_load_power.read().await
+ }
+}
+
+// Config interface for handling configuration reloads
+#[interface(name = "org.craftknight.CamperWidget.Config")]
+impl ConfigInterface {
+ /// Reload configuration method
+ async fn reload(&self) -> zbus::fdo::Result<()> {
+ self.state.config_reload_notify.notify_one();
+ Ok(())
+ }
+}
+
+// Start a session bus service and return the connection; keep it alive in main
+pub async fn start_service() -> Result<(Connection, SharedState)> {
+ let shared = SharedState::default();
+
+ let status_server = StatusInterface::new(shared.clone());
+ let battery_server = BatteryInterface::new(shared.clone());
+ let solar_server = SolarInterface::new(shared.clone());
+ let ac_input_server = AcInputInterface::new(shared.clone());
+ let ac_load_server = AcLoadInterface::new(shared.clone());
+ let config_server = ConfigInterface::new(shared.clone());
+
+ let conn = connection::Builder::session()?
+ .name("org.craftknight.CamperWidget")?
+ .serve_at(ROOT_OBJECT_PATH, status_server)?
+ .serve_at(ROOT_OBJECT_PATH, config_server)?
+ .serve_at(BATTERY_OBJECT_PATH, battery_server)?
+ .serve_at(SOLAR_OBJECT_PATH, solar_server)?
+ .serve_at(AC_INPUT_OBJECT_PATH, ac_input_server)?
+ .serve_at(AC_LOAD_OBJECT_PATH, ac_load_server)?
+ .build()
+ .await?;
+
+ Ok((conn, shared))
+}
+
+/// Start a D-Bus service at a custom address (for integration testing).
+pub async fn start_service_at(address: &str) -> Result<(Connection, SharedState)> {
+ let shared = SharedState::default();
+
+ let status_server = StatusInterface::new(shared.clone());
+ let battery_server = BatteryInterface::new(shared.clone());
+ let solar_server = SolarInterface::new(shared.clone());
+ let ac_input_server = AcInputInterface::new(shared.clone());
+ let ac_load_server = AcLoadInterface::new(shared.clone());
+ let config_server = ConfigInterface::new(shared.clone());
+
+ let conn = connection::Builder::address(address)?
+ .name("org.craftknight.CamperWidget")?
+ .serve_at(ROOT_OBJECT_PATH, status_server)?
+ .serve_at(ROOT_OBJECT_PATH, config_server)?
+ .serve_at(BATTERY_OBJECT_PATH, battery_server)?
+ .serve_at(SOLAR_OBJECT_PATH, solar_server)?
+ .serve_at(AC_INPUT_OBJECT_PATH, ac_input_server)?
+ .serve_at(AC_LOAD_OBJECT_PATH, ac_load_server)?
+ .build()
+ .await?;
+
+ Ok((conn, shared))
+}
+
+// Property types for individual updates
+#[derive(Debug, Clone, PartialEq)]
+pub enum PropertyType {
+ BatterySoc,
+ BatteryPower,
+ BatteryState,
+ SolarPower,
+ AcInputPower,
+ AcLoadPower,
+}
+
+// Update connected status and emit signal
+pub async fn update_connected_status(
+ conn: &Connection,
+ state: &SharedState,
+ connected: bool,
+) -> Result<()> {
+ let object_server = conn.object_server();
+
+ let mut guard = state.connected.write().await;
+ if *guard != connected {
+ *guard = connected;
+ // Emit signal for connected status asynchronously
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, StatusInterface>(ROOT_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) = iface.connected_changed(iface_ref.signal_emitter()).await {
+ error!("Connected: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!("Connected: Failed to get interface at {}", ROOT_OBJECT_PATH);
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for Connected");
+ }
+ }
+ }
+
+ Ok(())
+}
+
+// Update battery state from a string directly (used by Batteries topic)
+pub async fn update_battery_state_str(
+ conn: &Connection,
+ state: &SharedState,
+ state_str: &str,
+) -> Result<()> {
+ let object_server = conn.object_server();
+ let mut guard = state.battery_state.write().await;
+ if *guard != state_str {
+ *guard = state_str.to_string();
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) = iface.state_changed(iface_ref.signal_emitter()).await {
+ error!("BatteryState: Failed to emit signal: {}", e);
+ }
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for BatteryState");
+ }
+ }
+ }
+ Ok(())
+}
+
+// Update a single DBus property immediately
+pub async fn update_single_property(
+ conn: &Connection,
+ state: &SharedState,
+ property_type: &PropertyType,
+ value: f64,
+) -> Result<()> {
+ let object_server = conn.object_server();
+
+ match property_type {
+ PropertyType::BatterySoc => {
+ let mut guard = state.battery_soc.write().await;
+ if (*guard - value).abs() > f64::EPSILON {
+ *guard = value;
+ // Emit signal for battery SoC asynchronously
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) = iface.soc_changed(iface_ref.signal_emitter()).await
+ {
+ error!("BatterySoc: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!(
+ "BatterySoc: Failed to get interface at {}",
+ BATTERY_OBJECT_PATH
+ );
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for BatterySoc");
+ }
+ }
+ }
+ }
+ PropertyType::BatteryPower => {
+ let mut guard = state.battery_power.write().await;
+ if (*guard - value).abs() > f64::EPSILON {
+ *guard = value;
+ // Emit signal for battery power asynchronously
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) =
+ iface.power_changed(iface_ref.signal_emitter()).await
+ {
+ error!("BatteryPower: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!(
+ "BatteryPower: Failed to get interface at {}",
+ BATTERY_OBJECT_PATH
+ );
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for BatteryPower");
+ }
+ }
+ }
+ }
+ PropertyType::BatteryState => {
+ let state_str = if value > 5.0 {
+ "charging".to_string()
+ } else if value < -5.0 {
+ "discharging".to_string()
+ } else {
+ "idle".to_string()
+ };
+
+ let mut guard = state.battery_state.write().await;
+ if *guard != state_str {
+ *guard = state_str.clone();
+ // Emit signal for battery state asynchronously
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) =
+ iface.state_changed(iface_ref.signal_emitter()).await
+ {
+ error!("BatteryState: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!(
+ "BatteryState: Failed to get interface at {}",
+ BATTERY_OBJECT_PATH
+ );
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for BatteryState");
+ }
+ }
+ }
+ }
+ PropertyType::SolarPower => {
+ let mut guard = state.solar_power.write().await;
+ if (*guard - value).abs() > f64::EPSILON {
+ *guard = value;
+ // Emit signal for solar power asynchronously
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, SolarInterface>(SOLAR_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) =
+ iface.power_changed(iface_ref.signal_emitter()).await
+ {
+ error!("SolarPower: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!(
+ "SolarPower: Failed to get interface at {}",
+ SOLAR_OBJECT_PATH
+ );
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for SolarPower");
+ }
+ }
+ }
+ }
+ PropertyType::AcInputPower => {
+ let mut guard = state.ac_input_power.write().await;
+ if (*guard - value).abs() > f64::EPSILON {
+ *guard = value;
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, AcInputInterface>(AC_INPUT_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) =
+ iface.power_changed(iface_ref.signal_emitter()).await
+ {
+ error!("AcInputPower: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!(
+ "AcInputPower: Failed to get interface at {}",
+ AC_INPUT_OBJECT_PATH
+ );
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for AcInputPower");
+ }
+ }
+ }
+ }
+ PropertyType::AcLoadPower => {
+ let mut guard = state.ac_load_power.write().await;
+ if (*guard - value).abs() > f64::EPSILON {
+ *guard = value;
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, AcLoadInterface>(AC_LOAD_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) =
+ iface.power_changed(iface_ref.signal_emitter()).await
+ {
+ error!("AcLoadPower: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!(
+ "AcLoadPower: Failed to get interface at {}",
+ AC_LOAD_OBJECT_PATH
+ );
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for AcLoadPower");
+ }
+ }
+ }
+ }
+ }
+
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[tokio::test]
+ async fn test_shared_state_initialization() {
+ let state = SharedState::default();
+ assert!(!(*state.connected.read().await));
+ assert_eq!(*state.battery_soc.read().await, 0.0);
+ assert_eq!(*state.battery_power.read().await, 0.0);
+ assert_eq!(*state.battery_state.read().await, "idle");
+ assert_eq!(*state.solar_power.read().await, 0.0);
+ assert_eq!(*state.ac_input_power.read().await, 0.0);
+ assert_eq!(*state.ac_load_power.read().await, 0.0);
+ }
+
+ #[tokio::test]
+ async fn test_property_updates() {
+ let state = SharedState::default();
+
+ // Test connected status update
+ {
+ let mut guard = state.connected.write().await;
+ *guard = true;
+ }
+ assert!(*state.connected.read().await);
+
+ // Test battery property updates
+ {
+ let mut soc_guard = state.battery_soc.write().await;
+ *soc_guard = 75.0;
+ }
+ {
+ let mut power_guard = state.battery_power.write().await;
+ *power_guard = -15.0;
+ }
+ {
+ let mut state_guard = state.battery_state.write().await;
+ *state_guard = "discharging".to_string();
+ }
+
+ assert_eq!(*state.battery_soc.read().await, 75.0);
+ assert_eq!(*state.battery_power.read().await, -15.0);
+ assert_eq!(*state.battery_state.read().await, "discharging");
+
+ // Test solar power update
+ {
+ let mut solar_guard = state.solar_power.write().await;
+ *solar_guard = 120.0;
+ }
+ assert_eq!(*state.solar_power.read().await, 120.0);
+
+ // Test AC input power update
+ {
+ let mut ac_input_guard = state.ac_input_power.write().await;
+ *ac_input_guard = 82.0;
+ }
+ assert_eq!(*state.ac_input_power.read().await, 82.0);
+
+ // Test AC load power update
+ {
+ let mut ac_load_guard = state.ac_load_power.write().await;
+ *ac_load_guard = 77.0;
+ }
+ assert_eq!(*state.ac_load_power.read().await, 77.0);
+ }
+
+ #[tokio::test]
+ async fn test_interface_creation() {
+ let state = SharedState::default();
+
+ let _status_interface = StatusInterface::new(state.clone());
+ let _battery_interface = BatteryInterface::new(state.clone());
+ let _solar_interface = SolarInterface::new(state.clone());
+ let _ac_input_interface = AcInputInterface::new(state.clone());
+ let _ac_load_interface = AcLoadInterface::new(state.clone());
+ let _config_interface = ConfigInterface::new(state.clone());
+
+ // Test that interfaces can be created without panicking
+ // If we get here, creation succeeded
+ }
+
+ #[test]
+ fn test_property_change_detection() {
+ // Test floating point comparison logic without async
+ let value1: f64 = 50.0;
+ let value2: f64 = 50.1;
+
+ assert!((value1 - 50.0_f64).abs() < f64::EPSILON);
+ assert!((value2 - 50.1_f64).abs() < f64::EPSILON);
+ assert!((value1 - value2).abs() > f64::EPSILON);
+ }
+
+ #[tokio::test]
+ async fn test_battery_state_mapping() {
+ // Test the battery state mapping logic from main.rs
+ let test_cases = vec![
+ (10.0, "charging"),
+ (-10.0, "discharging"),
+ (0.0, "idle"),
+ (3.0, "idle"), // Below threshold
+ (-3.0, "idle"), // Above negative threshold
+ ];
+
+ for (power, expected_state) in test_cases {
+ let direction = if power > 5.0 {
+ "charging".to_string()
+ } else if power < -5.0 {
+ "discharging".to_string()
+ } else {
+ "idle".to_string()
+ };
+
+ assert_eq!(
+ direction, expected_state,
+ "Power {power} should map to {expected_state}"
+ );
+ }
+ }
+}
diff --git a/service/src/lib.rs b/service/src/lib.rs
new file mode 100644
index 0000000..56bf445
--- /dev/null
+++ b/service/src/lib.rs
@@ -0,0 +1,4 @@
+pub mod cache;
+pub mod config;
+pub mod dbus;
+pub mod victron_mqtt;
diff --git a/service/src/main.rs b/service/src/main.rs
new file mode 100644
index 0000000..51f7ff7
--- /dev/null
+++ b/service/src/main.rs
@@ -0,0 +1,238 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use anyhow::Result;
+use camper_widget_refresh::cache::{read_serial_cache, write_serial_cache};
+use camper_widget_refresh::config::{Config, listen_for_config_changes, load_config};
+use camper_widget_refresh::dbus::{self, start_service, update_connected_status};
+use camper_widget_refresh::victron_mqtt::{
+ build_mqtt_options, disconnect_mqtt, pick_first_available,
+ read_values_and_update_properties_immediately, wait_for_serial,
+};
+use rumqttc::AsyncClient;
+use tokio::sync::RwLock;
+use tokio::task::JoinHandle;
+use tokio::time;
+use tokio_util::sync::CancellationToken;
+use tracing::{error, info, warn};
+
+const MAX_CONFIG_LISTENER_BACKOFF_SECS: u64 = 30;
+
+async fn refresh_data(
+ cfg: &Config,
+ conn: &zbus::Connection,
+ dbus_state: &dbus::SharedState,
+) -> Result<()> {
+ let (host, port) = pick_first_available(&cfg.endpoints).await?;
+ info!("Connecting to MQTT {}:{} as {}", host, port, cfg.client_id);
+
+ // Set connected to true when we found a reachable host
+ update_connected_status(conn, dbus_state, true).await?;
+
+ let mqttoptions = build_mqtt_options(cfg, &host, port);
+ let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
+
+ // Try cache for serial first
+ let mut serial: Option<String> = match read_serial_cache().await {
+ Ok(Some(s)) => {
+ info!("Using cached serial: {}", s);
+ Some(s)
+ }
+ Ok(None) => None,
+ Err(e) => {
+ warn!("Failed reading cache: {}", e);
+ None
+ }
+ };
+
+ if serial.is_none() {
+ info!("Waiting for serial on N/+/system/0/Serial...");
+ let s = wait_for_serial(&client, &mut eventloop).await?;
+ // Long TTL 30 days
+ if let Err(e) = write_serial_cache(&s, Duration::from_secs(30 * 24 * 3600)).await {
+ warn!("Failed to write cache: {}", e);
+ }
+ serial = Some(s);
+ }
+
+ let serial = serial.expect("serial should be set");
+
+ // Read values and update DBus properties immediately as each MQTT message arrives
+ let result = read_values_and_update_properties_immediately(
+ &client,
+ &mut eventloop,
+ &serial,
+ conn,
+ dbus_state,
+ )
+ .await;
+
+ disconnect_mqtt(&client).await;
+
+ match result? {
+ true => {
+ info!("Completed refresh cycle - all properties updated");
+ }
+ false => {
+ warn!("Refresh cycle completed with partial data - some MQTT topics timed out");
+ }
+ }
+
+ Ok(())
+}
+
+fn spawn_config_listener(
+ conn: zbus::Connection,
+ config_state: Arc<RwLock<Config>>,
+ shared_state: dbus::SharedState,
+ token: CancellationToken,
+) -> JoinHandle<()> {
+ tokio::spawn(async move {
+ if let Err(e) = listen_for_config_changes(&conn, config_state, &shared_state, token).await {
+ error!("Config change listener failed: {}", e);
+ }
+ })
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ // Initialize tracing early so debug logs from config loading are visible
+ let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
+ .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
+ let _ = tracing::subscriber::set_global_default(
+ tracing_subscriber::fmt()
+ .with_env_filter(env_filter)
+ .with_target(false)
+ .finish(),
+ );
+
+ let initial_cfg = load_config().await;
+
+ // Start D-Bus service and keep connection/state alive
+ let (dbus_conn, dbus_state) = start_service().await?;
+
+ // Create shared config state
+ let config_state = Arc::new(RwLock::new(initial_cfg));
+
+ // Graceful shutdown via CancellationToken
+ let shutdown_token = CancellationToken::new();
+ let ctrl_c_token = shutdown_token.clone();
+ tokio::spawn(async move {
+ if let Err(e) = tokio::signal::ctrl_c().await {
+ error!("Failed to listen for ctrl-c: {}", e);
+ return;
+ }
+ info!("Received shutdown signal, initiating graceful shutdown");
+ ctrl_c_token.cancel();
+ });
+
+ info!(
+ "Starting camper-widget-refresh with refresh interval: {} seconds",
+ config_state.read().await.refresh_interval_seconds
+ );
+
+ // Spawn the config change listener with supervision
+ let mut config_listener_handle = spawn_config_listener(
+ dbus_conn.clone(),
+ config_state.clone(),
+ dbus_state.clone(),
+ shutdown_token.child_token(),
+ );
+ let mut config_backoff_secs: u64 = 1;
+
+ // Run initial refresh
+ {
+ let cfg = config_state.read().await;
+ if let Err(e) = refresh_data(&cfg, &dbus_conn, &dbus_state).await {
+ warn!("Initial refresh failed: {}", e);
+ // Set connected to false when refresh fails
+ if let Err(update_err) = update_connected_status(&dbus_conn, &dbus_state, false).await {
+ warn!("Failed to update connected status: {}", update_err);
+ }
+ }
+ }
+
+ // Run refresh loop with dynamic interval
+ while !shutdown_token.is_cancelled() {
+ // Supervise config listener — respawn with backoff if it died
+ if config_listener_handle.is_finished() && !shutdown_token.is_cancelled() {
+ error!(
+ "Config listener exited unexpectedly, respawning in {}s",
+ config_backoff_secs
+ );
+ time::sleep(Duration::from_secs(config_backoff_secs)).await;
+ config_backoff_secs = (config_backoff_secs * 2).min(MAX_CONFIG_LISTENER_BACKOFF_SECS);
+ config_listener_handle = spawn_config_listener(
+ dbus_conn.clone(),
+ config_state.clone(),
+ dbus_state.clone(),
+ shutdown_token.child_token(),
+ );
+ }
+
+ let refresh_interval = {
+ let cfg = config_state.read().await;
+ Duration::from_secs(cfg.refresh_interval_seconds)
+ };
+
+ // Race sleep against shutdown
+ tokio::select! {
+ _ = time::sleep(refresh_interval) => {}
+ _ = shutdown_token.cancelled() => break,
+ }
+
+ let cfg = config_state.read().await;
+ match refresh_data(&cfg, &dbus_conn, &dbus_state).await {
+ Ok(()) => {
+ // Successful refresh — reset config listener backoff
+ config_backoff_secs = 1;
+ }
+ Err(e) => {
+ warn!("Refresh failed: {}", e);
+ if let Err(update_err) =
+ update_connected_status(&dbus_conn, &dbus_state, false).await
+ {
+ warn!("Failed to update connected status: {}", update_err);
+ }
+ }
+ }
+ }
+
+ info!("Shutting down cleanly");
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+
+ // parse_numeric_value tests moved to victron_mqtt.rs
+
+ #[test]
+ fn test_battery_direction_mapping() {
+ // Test the direction mapping logic
+ let test_cases = vec![
+ (10.0, "charging"),
+ (-10.0, "discharging"),
+ (0.0, "idle"),
+ (3.0, "idle"), // Below threshold
+ (-3.0, "idle"), // Above negative threshold
+ (5.1, "charging"), // Just above threshold
+ (-5.1, "discharging"), // Just below negative threshold
+ ];
+
+ for (power, expected) in test_cases {
+ let direction = if power > 5.0 {
+ "charging".to_string()
+ } else if power < -5.0 {
+ "discharging".to_string()
+ } else {
+ "idle".to_string()
+ };
+
+ assert_eq!(
+ direction, expected,
+ "Power {power} should map to {expected}"
+ );
+ }
+ }
+}
diff --git a/service/src/victron_mqtt.rs b/service/src/victron_mqtt.rs
new file mode 100644
index 0000000..d72ddb3
--- /dev/null
+++ b/service/src/victron_mqtt.rs
@@ -0,0 +1,576 @@
+use std::time::Duration;
+
+use crate::config::Config;
+use crate::dbus::{PropertyType, SharedState, update_battery_state_str, update_single_property};
+use anyhow::{Result, anyhow};
+use rumqttc::{AsyncClient, Event, EventLoop, Incoming, MqttOptions, QoS};
+use tokio::{net::TcpStream, select, time};
+use tracing::{debug, info, warn};
+use zbus::Connection;
+
+// Topic-to-property mapping structure (for simple 1:1 topic→property mappings)
+#[derive(Debug, Clone)]
+pub struct TopicPropertyMapping {
+ pub topic_pattern: String,
+ pub property_type: PropertyType,
+}
+
+impl TopicPropertyMapping {
+ pub fn new(topic_pattern: String, property_type: PropertyType) -> Self {
+ Self {
+ topic_pattern,
+ property_type,
+ }
+ }
+}
+
+// Collection of all topic mappings for a given serial
+pub struct MqttPropertyMappings {
+ mappings: Vec<TopicPropertyMapping>,
+ batteries_topic: String, // system/0/Batteries — yields SoC, Power, State
+}
+
+impl MqttPropertyMappings {
+ pub fn new(serial: &str) -> Self {
+ Self {
+ mappings: vec![
+ TopicPropertyMapping::new(
+ format!("N/{serial}/solarcharger/277/Yield/Power"),
+ PropertyType::SolarPower,
+ ),
+ TopicPropertyMapping::new(
+ format!("N/{serial}/system/0/Ac/Grid/L1/Power"),
+ PropertyType::AcInputPower,
+ ),
+ TopicPropertyMapping::new(
+ format!("N/{serial}/system/0/Ac/Consumption/L1/Power"),
+ PropertyType::AcLoadPower,
+ ),
+ ],
+ batteries_topic: format!("N/{serial}/system/0/Batteries"),
+ }
+ }
+
+ // Find mapping for a given topic (simple 1:1 mappings only)
+ pub fn find_mapping(&self, topic: &str) -> Option<&TopicPropertyMapping> {
+ self.mappings.iter().find(|m| m.topic_pattern == topic)
+ }
+
+ // Check if topic is the composite Batteries topic
+ pub fn is_batteries_topic(&self, topic: &str) -> bool {
+ self.batteries_topic == topic
+ }
+
+ // Get all topic patterns for subscription
+ pub fn get_subscription_topics(&self) -> Vec<&str> {
+ let mut topics: Vec<&str> = self
+ .mappings
+ .iter()
+ .map(|m| m.topic_pattern.as_str())
+ .collect();
+ topics.push(&self.batteries_topic);
+ topics
+ }
+}
+
+/// Parse the system/0/Batteries JSON payload.
+/// Returns (soc, power, state_string) from the first battery entry.
+pub fn parse_batteries_payload(payload: &str) -> Option<(f64, f64, String)> {
+ let json: serde_json::Value = serde_json::from_str(payload).ok()?;
+ let batteries = json.get("value")?.as_array()?;
+ let battery = batteries.first()?;
+ let soc = battery.get("soc")?.as_f64()?;
+ let power = battery.get("power")?.as_f64()?;
+ let state_num = battery.get("state")?.as_i64().unwrap_or(0);
+ let state_str = match state_num {
+ 1 => "charging",
+ 2 => "discharging",
+ _ => "idle",
+ };
+ Some((soc, power, state_str.to_string()))
+}
+
+// Parse numeric value from MQTT payload (moved from main.rs)
+fn parse_numeric_value(s: Option<&String>) -> f64 {
+ if let Some(raw) = s {
+ // First try direct number parsing
+ if let Ok(v) = raw.trim().parse::<f64>() {
+ return v;
+ }
+ // Try JSON object with {"value": number}
+ if let Ok(val) = serde_json::from_str::<serde_json::Value>(raw)
+ && let Some(v) = val.get("value")
+ {
+ if let Some(n) = v.as_f64() {
+ return n;
+ }
+ if let Some(s) = v.as_str()
+ && let Ok(n) = s.trim().parse::<f64>()
+ {
+ return n;
+ }
+ }
+ }
+ 0.0
+}
+
+// Returns Some(serial) when topic is like "N/{serial}/system/0/Serial"
+pub fn extract_serial_from_topic(topic: &str) -> Option<String> {
+ if topic.starts_with("N/") && topic.ends_with("/system/0/Serial") {
+ let parts: Vec<&str> = topic.split('/').collect();
+ if parts.len() >= 3 {
+ return Some(parts[1].to_string());
+ }
+ }
+ None
+}
+
+pub async fn disconnect_mqtt(client: &AsyncClient) {
+ if let Err(e) = client.disconnect().await {
+ warn!("MQTT disconnect failed: {}", e);
+ }
+}
+
+pub fn build_mqtt_options(cfg: &Config, host: &str, port: u16) -> MqttOptions {
+ let mut opts = MqttOptions::new(&cfg.client_id, host, port);
+ opts.set_keep_alive(Duration::from_secs(10));
+ if let (Some(u), Some(p)) = (&cfg.username, &cfg.password) {
+ opts.set_credentials(u, p);
+ }
+ opts
+}
+
+pub async fn pick_first_available(endpoints: &[(String, u16)]) -> Result<(String, u16)> {
+ for (host, port) in endpoints {
+ let addr = (host.as_str(), *port);
+ let attempt = time::timeout(Duration::from_secs(2), TcpStream::connect(addr)).await;
+ match attempt {
+ Ok(Ok(stream)) => {
+ // Successfully connected; close and return this endpoint
+ drop(stream);
+ return Ok((host.clone(), *port));
+ }
+ _ => {
+ // Try next
+ }
+ }
+ }
+ Err(anyhow!("No reachable MQTT endpoint from config"))
+}
+
+pub async fn wait_for_serial(client: &AsyncClient, eventloop: &mut EventLoop) -> Result<String> {
+ // Subscribe to N/+/system/0/Serial
+ client
+ .subscribe("N/+/system/0/Serial", QoS::AtLeastOnce)
+ .await?;
+ loop {
+ match eventloop.poll().await? {
+ Event::Incoming(Incoming::Publish(p)) => {
+ let topic = p.topic.clone();
+ if let Some(serial) = extract_serial_from_topic(&topic) {
+ info!("Detected serial from topic: {}", serial);
+ return Ok(serial);
+ }
+ }
+ _ => {
+ // Ignore other events
+ }
+ }
+ }
+}
+
+/// Read values and update DBus properties immediately as each message arrives.
+/// Returns `Ok(true)` if all topics were received, `Ok(false)` on timeout with partial data.
+pub async fn read_values_and_update_properties_immediately(
+ client: &AsyncClient,
+ eventloop: &mut EventLoop,
+ serial: &str,
+ conn: &Connection,
+ dbus_state: &SharedState,
+) -> Result<bool> {
+ let mappings = MqttPropertyMappings::new(serial);
+ let topics = mappings.get_subscription_topics();
+
+ // Subscribe to all topics
+ for topic in &topics {
+ client.subscribe(*topic, QoS::AtLeastOnce).await?;
+ }
+ debug!(
+ "Subscribed to {} topics for serial={}",
+ topics.len(),
+ serial
+ );
+
+ let mut received_topics = std::collections::HashSet::new();
+ let total_topics = topics.len();
+
+ // We'll wait up to 30 seconds for all values
+ let timeout = time::sleep(Duration::from_secs(30));
+ tokio::pin!(timeout);
+
+ let all_received = loop {
+ select! {
+ _ = &mut timeout => {
+ warn!("Timeout while waiting for all values, received {} of {}", received_topics.len(), total_topics);
+ break false;
+ }
+ ev = eventloop.poll() => {
+ match ev? {
+ Event::Incoming(Incoming::Publish(p)) => {
+ let topic = p.topic.clone();
+ let payload_str = String::from_utf8_lossy(&p.payload).to_string();
+
+ if let Some(mapping) = mappings.find_mapping(&topic) {
+ let value = parse_numeric_value(Some(&payload_str));
+ update_single_property(conn, dbus_state, &mapping.property_type, value).await?;
+ received_topics.insert(topic.clone());
+ info!("Updated {:?} with value {} ({} of {} topics received)",
+ mapping.property_type, value, received_topics.len(), total_topics);
+ } else if mappings.is_batteries_topic(&topic)
+ && let Some((soc, power, state_str)) = parse_batteries_payload(&payload_str)
+ {
+ update_single_property(conn, dbus_state, &PropertyType::BatterySoc, soc).await?;
+ update_single_property(conn, dbus_state, &PropertyType::BatteryPower, power).await?;
+ update_battery_state_str(conn, dbus_state, &state_str).await?;
+ received_topics.insert(topic.clone());
+ info!("Updated battery from Batteries topic: soc={}, power={}, state={} ({} of {} topics received)",
+ soc, power, state_str, received_topics.len(), total_topics);
+ }
+
+ if received_topics.len() >= total_topics {
+ info!("All {} properties updated successfully", total_topics);
+ break true;
+ }
+ }
+ _ => {
+ // Ignore other events (ConnAck, SubAck, etc.)
+ }
+ }
+ }
+ }
+ };
+
+ Ok(all_received)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use tokio::net::TcpListener;
+
+ #[test]
+ fn test_extract_serial_from_topic() {
+ assert_eq!(
+ extract_serial_from_topic("N/abc123/system/0/Serial"),
+ Some("abc123".to_string())
+ );
+ assert_eq!(
+ extract_serial_from_topic("N/xyz/system/0/Serial"),
+ Some("xyz".to_string())
+ );
+ assert_eq!(
+ extract_serial_from_topic("N//system/0/Serial"),
+ Some("".to_string())
+ );
+ assert_eq!(extract_serial_from_topic("N/abc123/system/0/Other"), None);
+ assert_eq!(extract_serial_from_topic("foo/bar"), None);
+ }
+
+ #[tokio::test]
+ async fn test_pick_first_available_picks_open_second() {
+ // Start a listener for the second endpoint only
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+
+ let endpoints = vec![
+ ("127.0.0.1".to_string(), 1), // very likely closed
+ (addr.ip().to_string(), addr.port()),
+ ];
+
+ let chosen = pick_first_available(&endpoints).await.unwrap();
+ assert_eq!(chosen, (addr.ip().to_string(), addr.port()));
+
+ drop(listener);
+ }
+
+ #[tokio::test]
+ async fn test_pick_first_available_prefers_first_when_both_open() {
+ let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr1 = listener1.local_addr().unwrap();
+ let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr2 = listener2.local_addr().unwrap();
+
+ let endpoints = vec![
+ (addr1.ip().to_string(), addr1.port()),
+ (addr2.ip().to_string(), addr2.port()),
+ ];
+
+ let chosen = pick_first_available(&endpoints).await.unwrap();
+ assert_eq!(chosen, (addr1.ip().to_string(), addr1.port()));
+
+ drop(listener1);
+ drop(listener2);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_direct() {
+ let value = "123.45".to_string();
+ let input = Some(&value);
+ assert_eq!(parse_numeric_value(input), 123.45);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json() {
+ let value = r#"{"value": 67.89}"#.to_string();
+ let input = Some(&value);
+ assert_eq!(parse_numeric_value(input), 67.89);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json_string() {
+ let value = r#"{"value": "90.12"}"#.to_string();
+ let input = Some(&value);
+ assert_eq!(parse_numeric_value(input), 90.12);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_invalid() {
+ let value = "invalid".to_string();
+ let input = Some(&value);
+ assert_eq!(parse_numeric_value(input), 0.0);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_none() {
+ let input = None;
+ assert_eq!(parse_numeric_value(input), 0.0);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_whitespace() {
+ let value = " 42.5 ".to_string();
+ let input = Some(&value);
+ assert_eq!(parse_numeric_value(input), 42.5);
+ }
+
+ #[test]
+ fn test_topic_property_mapping() {
+ let mappings = MqttPropertyMappings::new("test123");
+ let topics = mappings.get_subscription_topics();
+
+ assert_eq!(topics.len(), 4);
+ assert!(topics.contains(&"N/test123/solarcharger/277/Yield/Power"));
+ assert!(topics.contains(&"N/test123/system/0/Ac/Grid/L1/Power"));
+ assert!(topics.contains(&"N/test123/system/0/Ac/Consumption/L1/Power"));
+ assert!(topics.contains(&"N/test123/system/0/Batteries"));
+
+ // Test finding solar mapping
+ let solar_mapping = mappings.find_mapping("N/test123/solarcharger/277/Yield/Power");
+ assert!(solar_mapping.is_some());
+ assert_eq!(
+ solar_mapping.unwrap().property_type,
+ PropertyType::SolarPower
+ );
+
+ // Test finding AC input mapping
+ let ac_input_mapping = mappings.find_mapping("N/test123/system/0/Ac/Grid/L1/Power");
+ assert!(ac_input_mapping.is_some());
+ assert_eq!(
+ ac_input_mapping.unwrap().property_type,
+ PropertyType::AcInputPower
+ );
+
+ // Test finding AC load mapping
+ let ac_load_mapping = mappings.find_mapping("N/test123/system/0/Ac/Consumption/L1/Power");
+ assert!(ac_load_mapping.is_some());
+ assert_eq!(
+ ac_load_mapping.unwrap().property_type,
+ PropertyType::AcLoadPower
+ );
+
+ // Batteries topic is handled specially, not in find_mapping
+ assert!(
+ mappings
+ .find_mapping("N/test123/system/0/Batteries")
+ .is_none()
+ );
+ assert!(mappings.is_batteries_topic("N/test123/system/0/Batteries"));
+ }
+
+ // --- parse_numeric_value: fallback chain and design decisions ---
+
+ #[test]
+ fn test_parse_numeric_value_negative() {
+ let value = "-123.45".to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), -123.45);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json_negative() {
+ let value = r#"{"value": -67.89}"#.to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), -67.89);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json_null() {
+ let value = r#"{"value": null}"#.to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), 0.0);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json_missing_value_key() {
+ let value = r#"{"power": 42}"#.to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), 0.0);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_empty_string() {
+ let value = "".to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), 0.0);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json_with_extra_fields() {
+ let value = r#"{"value": 42, "unit": "W"}"#.to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), 42.0);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json_non_numeric_string() {
+ let value = r#"{"value": "n/a"}"#.to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), 0.0);
+ }
+
+ // --- extract_serial_from_topic: boundary cases ---
+
+ #[test]
+ fn test_extract_serial_empty_topic() {
+ assert_eq!(extract_serial_from_topic(""), None);
+ }
+
+ #[test]
+ fn test_extract_serial_data_topic_rejected() {
+ // Data topics must not be mistaken for serial discovery topics
+ assert_eq!(
+ extract_serial_from_topic("N/abc/battery/278/Dc/0/Power"),
+ None
+ );
+ }
+
+ // --- MqttPropertyMappings: Victron device mapping design ---
+
+ #[test]
+ fn test_mapping_find_returns_none_for_unknown_topic() {
+ let mappings = MqttPropertyMappings::new("serial1");
+ assert!(mappings.find_mapping("N/serial1/unknown/0/Foo").is_none());
+ }
+
+ #[test]
+ fn test_mapping_solar_power_exists() {
+ let mappings = MqttPropertyMappings::new("s1");
+ let solar = mappings
+ .find_mapping("N/s1/solarcharger/277/Yield/Power")
+ .expect("solar mapping should exist");
+ assert_eq!(solar.property_type, PropertyType::SolarPower);
+ }
+
+ #[test]
+ fn test_mapping_different_serials_dont_cross_match() {
+ let mappings_a = MqttPropertyMappings::new("AAA");
+ let mappings_b = MqttPropertyMappings::new("BBB");
+ // Simple mappings from serial AAA must not match in serial BBB
+ for topic in mappings_a.get_subscription_topics() {
+ assert!(
+ mappings_b.find_mapping(topic).is_none(),
+ "serial BBB should not match topic from serial AAA: {topic}"
+ );
+ if mappings_a.is_batteries_topic(topic) {
+ assert!(
+ !mappings_b.is_batteries_topic(topic),
+ "serial BBB batteries topic should not match serial AAA: {topic}"
+ );
+ }
+ }
+ }
+
+ #[test]
+ fn test_parse_batteries_payload_charging() {
+ let payload = r#"{"value":[{"active_battery_service":true,"current":1.8,"id":"com.victronenergy.battery.ttyS7","instance":279,"name":"Shunt","power":48.13,"soc":67.1,"state":1,"voltage":26.74}]}"#;
+ let (soc, power, state) = parse_batteries_payload(payload).unwrap();
+ assert!((soc - 67.1).abs() < 0.01);
+ assert!((power - 48.13).abs() < 0.01);
+ assert_eq!(state, "charging");
+ }
+
+ #[test]
+ fn test_parse_batteries_payload_discharging() {
+ let payload = r#"{"value":[{"soc":50.0,"power":-334.78,"state":2}]}"#;
+ let (soc, power, state) = parse_batteries_payload(payload).unwrap();
+ assert!((soc - 50.0).abs() < 0.01);
+ assert!((power - (-334.78)).abs() < 0.01);
+ assert_eq!(state, "discharging");
+ }
+
+ #[test]
+ fn test_parse_batteries_payload_idle() {
+ let payload = r#"{"value":[{"soc":100.0,"power":0.0,"state":0}]}"#;
+ let (soc, _power, state) = parse_batteries_payload(payload).unwrap();
+ assert!((soc - 100.0).abs() < 0.01);
+ assert_eq!(state, "idle");
+ }
+
+ #[test]
+ fn test_parse_batteries_payload_invalid() {
+ assert!(parse_batteries_payload("not json").is_none());
+ assert!(parse_batteries_payload(r#"{"value":[]}"#).is_none());
+ assert!(parse_batteries_payload(r#"{"value":"nope"}"#).is_none());
+ }
+
+ #[test]
+ fn test_mapping_topics_embed_serial() {
+ let serial = "mySerial42";
+ let mappings = MqttPropertyMappings::new(serial);
+ for topic in mappings.get_subscription_topics() {
+ assert!(
+ topic.contains(serial),
+ "topic should contain serial: {topic}"
+ );
+ }
+ }
+
+ // --- build_mqtt_options: configuration policy ---
+
+ #[test]
+ fn test_build_mqtt_options_applies_credentials_when_present() {
+ let cfg = Config {
+ endpoints: vec![("127.0.0.1".to_string(), 1883)],
+ username: Some("user".to_string()),
+ password: Some("pass".to_string()),
+ client_id: "test-client".to_string(),
+ refresh_interval_seconds: 60,
+ };
+ let opts = build_mqtt_options(&cfg, "127.0.0.1", 1883);
+ let login = opts.credentials().expect("credentials should be set");
+ assert_eq!(login.username, "user");
+ assert_eq!(login.password, "pass");
+ assert_eq!(opts.keep_alive(), Duration::from_secs(10));
+ }
+
+ #[test]
+ fn test_build_mqtt_options_skips_credentials_when_absent() {
+ let cfg = Config {
+ endpoints: vec![("127.0.0.1".to_string(), 1883)],
+ username: None,
+ password: None,
+ client_id: "anon-client".to_string(),
+ refresh_interval_seconds: 60,
+ };
+ let opts = build_mqtt_options(&cfg, "127.0.0.1", 1883);
+ assert!(
+ opts.credentials().is_none(),
+ "anonymous mode should have no credentials"
+ );
+ assert_eq!(opts.client_id(), "anon-client");
+ assert_eq!(opts.keep_alive(), Duration::from_secs(10));
+ }
+}