use std::sync::Arc; use anyhow::Result; use tokio::sync::{Notify, RwLock, Semaphore}; use tracing::{error, warn}; use zbus::{Connection, connection, interface}; // Object paths pub const ROOT_OBJECT_PATH: &str = "/org/craftknight/camper_widget"; pub const BATTERY_OBJECT_PATH: &str = "/org/craftknight/camper_widget/Battery"; pub const SOLAR_OBJECT_PATH: &str = "/org/craftknight/camper_widget/Solar"; pub const AC_INPUT_OBJECT_PATH: &str = "/org/craftknight/camper_widget/AcInput"; pub const AC_LOAD_OBJECT_PATH: &str = "/org/craftknight/camper_widget/AcLoad"; #[derive(Clone)] pub struct SharedState { // Status interface data pub connected: Arc>, // Battery interface data pub battery_soc: Arc>, pub battery_power: Arc>, pub battery_state: Arc>, // Solar interface data pub solar_power: Arc>, // AC input interface data pub ac_input_power: Arc>, // AC load interface data pub ac_load_power: Arc>, // Config reload notification pub config_reload_notify: Arc, // Backpressure for D-Bus signal emission pub signal_semaphore: Arc, } impl Default for SharedState { fn default() -> Self { Self { connected: Arc::new(RwLock::new(false)), battery_soc: Arc::new(RwLock::new(0.0)), battery_power: Arc::new(RwLock::new(0.0)), battery_state: Arc::new(RwLock::new("idle".to_string())), solar_power: Arc::new(RwLock::new(0.0)), ac_input_power: Arc::new(RwLock::new(0.0)), ac_load_power: Arc::new(RwLock::new(0.0)), config_reload_notify: Arc::new(Notify::new()), signal_semaphore: Arc::new(Semaphore::new(10)), } } } #[derive(Clone)] pub struct StatusInterface { state: SharedState, } impl StatusInterface { fn new(state: SharedState) -> Self { Self { state } } } #[derive(Clone)] pub struct BatteryInterface { state: SharedState, } impl BatteryInterface { fn new(state: SharedState) -> Self { Self { state } } } #[derive(Clone)] pub struct SolarInterface { state: SharedState, } impl SolarInterface { fn new(state: SharedState) -> Self { Self { state } } } #[derive(Clone)] pub struct AcInputInterface { state: SharedState, } impl AcInputInterface { fn new(state: SharedState) -> Self { Self { state } } } #[derive(Clone)] pub struct AcLoadInterface { state: SharedState, } impl AcLoadInterface { fn new(state: SharedState) -> Self { Self { state } } } #[derive(Clone)] pub struct ConfigInterface { #[allow(dead_code)] state: SharedState, } impl ConfigInterface { fn new(state: SharedState) -> Self { Self { state } } } // Status interface - Connected property #[interface(name = "org.craftknight.CamperWidget.Status")] impl StatusInterface { #[zbus(property(emits_changed_signal = "true"))] async fn connected(&self) -> bool { *self.state.connected.read().await } } // Battery interface - SoC, Power, State properties #[interface(name = "org.craftknight.CamperWidget.Battery")] impl BatteryInterface { #[zbus(property(emits_changed_signal = "true"))] async fn soc(&self) -> f64 { *self.state.battery_soc.read().await } #[zbus(property(emits_changed_signal = "true"))] async fn power(&self) -> f64 { *self.state.battery_power.read().await } #[zbus(property(emits_changed_signal = "true"))] async fn state(&self) -> String { self.state.battery_state.read().await.clone() } } // Solar interface - Power property #[interface(name = "org.craftknight.CamperWidget.Solar")] impl SolarInterface { #[zbus(property(emits_changed_signal = "true"))] async fn power(&self) -> f64 { *self.state.solar_power.read().await } } // AC input interface - Power property #[interface(name = "org.craftknight.CamperWidget.AcInput")] impl AcInputInterface { #[zbus(property(emits_changed_signal = "true"))] async fn power(&self) -> f64 { *self.state.ac_input_power.read().await } } // AC load interface - Power property #[interface(name = "org.craftknight.CamperWidget.AcLoad")] impl AcLoadInterface { #[zbus(property(emits_changed_signal = "true"))] async fn power(&self) -> f64 { *self.state.ac_load_power.read().await } } // Config interface for handling configuration reloads #[interface(name = "org.craftknight.CamperWidget.Config")] impl ConfigInterface { /// Reload configuration method async fn reload(&self) -> zbus::fdo::Result<()> { self.state.config_reload_notify.notify_one(); Ok(()) } } // Start a session bus service and return the connection; keep it alive in main pub async fn start_service() -> Result<(Connection, SharedState)> { let shared = SharedState::default(); let status_server = StatusInterface::new(shared.clone()); let battery_server = BatteryInterface::new(shared.clone()); let solar_server = SolarInterface::new(shared.clone()); let ac_input_server = AcInputInterface::new(shared.clone()); let ac_load_server = AcLoadInterface::new(shared.clone()); let config_server = ConfigInterface::new(shared.clone()); let conn = connection::Builder::session()? .name("org.craftknight.CamperWidget")? .serve_at(ROOT_OBJECT_PATH, status_server)? .serve_at(ROOT_OBJECT_PATH, config_server)? .serve_at(BATTERY_OBJECT_PATH, battery_server)? .serve_at(SOLAR_OBJECT_PATH, solar_server)? .serve_at(AC_INPUT_OBJECT_PATH, ac_input_server)? .serve_at(AC_LOAD_OBJECT_PATH, ac_load_server)? .build() .await?; Ok((conn, shared)) } /// Start a D-Bus service at a custom address (for integration testing). pub async fn start_service_at(address: &str) -> Result<(Connection, SharedState)> { let shared = SharedState::default(); let status_server = StatusInterface::new(shared.clone()); let battery_server = BatteryInterface::new(shared.clone()); let solar_server = SolarInterface::new(shared.clone()); let ac_input_server = AcInputInterface::new(shared.clone()); let ac_load_server = AcLoadInterface::new(shared.clone()); let config_server = ConfigInterface::new(shared.clone()); let conn = connection::Builder::address(address)? .name("org.craftknight.CamperWidget")? .serve_at(ROOT_OBJECT_PATH, status_server)? .serve_at(ROOT_OBJECT_PATH, config_server)? .serve_at(BATTERY_OBJECT_PATH, battery_server)? .serve_at(SOLAR_OBJECT_PATH, solar_server)? .serve_at(AC_INPUT_OBJECT_PATH, ac_input_server)? .serve_at(AC_LOAD_OBJECT_PATH, ac_load_server)? .build() .await?; Ok((conn, shared)) } // Property types for individual updates #[derive(Debug, Clone, PartialEq)] pub enum PropertyType { BatterySoc, BatteryPower, BatteryState, SolarPower, AcInputPower, AcLoadPower, } // Update connected status and emit signal pub async fn update_connected_status( conn: &Connection, state: &SharedState, connected: bool, ) -> Result<()> { let object_server = conn.object_server(); let mut guard = state.connected.write().await; if *guard != connected { *guard = connected; // Emit signal for connected status asynchronously let object_server_clone = object_server.clone(); match state.signal_semaphore.clone().try_acquire_owned() { Ok(permit) => { tokio::spawn(async move { let _permit = permit; if let Ok(iface_ref) = object_server_clone .interface::<_, StatusInterface>(ROOT_OBJECT_PATH) .await { let iface = iface_ref.get_mut().await; if let Err(e) = iface.connected_changed(iface_ref.signal_emitter()).await { error!("Connected: Failed to emit signal: {}", e); } } else { error!("Connected: Failed to get interface at {}", ROOT_OBJECT_PATH); } }); } Err(_) => { warn!("Signal emission backpressure: dropping signal for Connected"); } } } Ok(()) } // Update battery state from a string directly (used by Batteries topic) pub async fn update_battery_state_str( conn: &Connection, state: &SharedState, state_str: &str, ) -> Result<()> { let object_server = conn.object_server(); let mut guard = state.battery_state.write().await; if *guard != state_str { *guard = state_str.to_string(); let object_server_clone = object_server.clone(); match state.signal_semaphore.clone().try_acquire_owned() { Ok(permit) => { tokio::spawn(async move { let _permit = permit; if let Ok(iface_ref) = object_server_clone .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH) .await { let iface = iface_ref.get_mut().await; if let Err(e) = iface.state_changed(iface_ref.signal_emitter()).await { error!("BatteryState: Failed to emit signal: {}", e); } } }); } Err(_) => { warn!("Signal emission backpressure: dropping signal for BatteryState"); } } } Ok(()) } // Update a single DBus property immediately pub async fn update_single_property( conn: &Connection, state: &SharedState, property_type: &PropertyType, value: f64, ) -> Result<()> { let object_server = conn.object_server(); match property_type { PropertyType::BatterySoc => { let mut guard = state.battery_soc.write().await; if (*guard - value).abs() > f64::EPSILON { *guard = value; // Emit signal for battery SoC asynchronously let object_server_clone = object_server.clone(); match state.signal_semaphore.clone().try_acquire_owned() { Ok(permit) => { tokio::spawn(async move { let _permit = permit; if let Ok(iface_ref) = object_server_clone .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH) .await { let iface = iface_ref.get_mut().await; if let Err(e) = iface.soc_changed(iface_ref.signal_emitter()).await { error!("BatterySoc: Failed to emit signal: {}", e); } } else { error!( "BatterySoc: Failed to get interface at {}", BATTERY_OBJECT_PATH ); } }); } Err(_) => { warn!("Signal emission backpressure: dropping signal for BatterySoc"); } } } } PropertyType::BatteryPower => { let mut guard = state.battery_power.write().await; if (*guard - value).abs() > f64::EPSILON { *guard = value; // Emit signal for battery power asynchronously let object_server_clone = object_server.clone(); match state.signal_semaphore.clone().try_acquire_owned() { Ok(permit) => { tokio::spawn(async move { let _permit = permit; if let Ok(iface_ref) = object_server_clone .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH) .await { let iface = iface_ref.get_mut().await; if let Err(e) = iface.power_changed(iface_ref.signal_emitter()).await { error!("BatteryPower: Failed to emit signal: {}", e); } } else { error!( "BatteryPower: Failed to get interface at {}", BATTERY_OBJECT_PATH ); } }); } Err(_) => { warn!("Signal emission backpressure: dropping signal for BatteryPower"); } } } } PropertyType::BatteryState => { let state_str = if value > 5.0 { "charging".to_string() } else if value < -5.0 { "discharging".to_string() } else { "idle".to_string() }; let mut guard = state.battery_state.write().await; if *guard != state_str { *guard = state_str.clone(); // Emit signal for battery state asynchronously let object_server_clone = object_server.clone(); match state.signal_semaphore.clone().try_acquire_owned() { Ok(permit) => { tokio::spawn(async move { let _permit = permit; if let Ok(iface_ref) = object_server_clone .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH) .await { let iface = iface_ref.get_mut().await; if let Err(e) = iface.state_changed(iface_ref.signal_emitter()).await { error!("BatteryState: Failed to emit signal: {}", e); } } else { error!( "BatteryState: Failed to get interface at {}", BATTERY_OBJECT_PATH ); } }); } Err(_) => { warn!("Signal emission backpressure: dropping signal for BatteryState"); } } } } PropertyType::SolarPower => { let mut guard = state.solar_power.write().await; if (*guard - value).abs() > f64::EPSILON { *guard = value; // Emit signal for solar power asynchronously let object_server_clone = object_server.clone(); match state.signal_semaphore.clone().try_acquire_owned() { Ok(permit) => { tokio::spawn(async move { let _permit = permit; if let Ok(iface_ref) = object_server_clone .interface::<_, SolarInterface>(SOLAR_OBJECT_PATH) .await { let iface = iface_ref.get_mut().await; if let Err(e) = iface.power_changed(iface_ref.signal_emitter()).await { error!("SolarPower: Failed to emit signal: {}", e); } } else { error!( "SolarPower: Failed to get interface at {}", SOLAR_OBJECT_PATH ); } }); } Err(_) => { warn!("Signal emission backpressure: dropping signal for SolarPower"); } } } } PropertyType::AcInputPower => { let mut guard = state.ac_input_power.write().await; if (*guard - value).abs() > f64::EPSILON { *guard = value; let object_server_clone = object_server.clone(); match state.signal_semaphore.clone().try_acquire_owned() { Ok(permit) => { tokio::spawn(async move { let _permit = permit; if let Ok(iface_ref) = object_server_clone .interface::<_, AcInputInterface>(AC_INPUT_OBJECT_PATH) .await { let iface = iface_ref.get_mut().await; if let Err(e) = iface.power_changed(iface_ref.signal_emitter()).await { error!("AcInputPower: Failed to emit signal: {}", e); } } else { error!( "AcInputPower: Failed to get interface at {}", AC_INPUT_OBJECT_PATH ); } }); } Err(_) => { warn!("Signal emission backpressure: dropping signal for AcInputPower"); } } } } PropertyType::AcLoadPower => { let mut guard = state.ac_load_power.write().await; if (*guard - value).abs() > f64::EPSILON { *guard = value; let object_server_clone = object_server.clone(); match state.signal_semaphore.clone().try_acquire_owned() { Ok(permit) => { tokio::spawn(async move { let _permit = permit; if let Ok(iface_ref) = object_server_clone .interface::<_, AcLoadInterface>(AC_LOAD_OBJECT_PATH) .await { let iface = iface_ref.get_mut().await; if let Err(e) = iface.power_changed(iface_ref.signal_emitter()).await { error!("AcLoadPower: Failed to emit signal: {}", e); } } else { error!( "AcLoadPower: Failed to get interface at {}", AC_LOAD_OBJECT_PATH ); } }); } Err(_) => { warn!("Signal emission backpressure: dropping signal for AcLoadPower"); } } } } } Ok(()) } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_shared_state_initialization() { let state = SharedState::default(); assert!(!(*state.connected.read().await)); assert_eq!(*state.battery_soc.read().await, 0.0); assert_eq!(*state.battery_power.read().await, 0.0); assert_eq!(*state.battery_state.read().await, "idle"); assert_eq!(*state.solar_power.read().await, 0.0); assert_eq!(*state.ac_input_power.read().await, 0.0); assert_eq!(*state.ac_load_power.read().await, 0.0); } #[tokio::test] async fn test_property_updates() { let state = SharedState::default(); // Test connected status update { let mut guard = state.connected.write().await; *guard = true; } assert!(*state.connected.read().await); // Test battery property updates { let mut soc_guard = state.battery_soc.write().await; *soc_guard = 75.0; } { let mut power_guard = state.battery_power.write().await; *power_guard = -15.0; } { let mut state_guard = state.battery_state.write().await; *state_guard = "discharging".to_string(); } assert_eq!(*state.battery_soc.read().await, 75.0); assert_eq!(*state.battery_power.read().await, -15.0); assert_eq!(*state.battery_state.read().await, "discharging"); // Test solar power update { let mut solar_guard = state.solar_power.write().await; *solar_guard = 120.0; } assert_eq!(*state.solar_power.read().await, 120.0); // Test AC input power update { let mut ac_input_guard = state.ac_input_power.write().await; *ac_input_guard = 82.0; } assert_eq!(*state.ac_input_power.read().await, 82.0); // Test AC load power update { let mut ac_load_guard = state.ac_load_power.write().await; *ac_load_guard = 77.0; } assert_eq!(*state.ac_load_power.read().await, 77.0); } #[tokio::test] async fn test_interface_creation() { let state = SharedState::default(); let _status_interface = StatusInterface::new(state.clone()); let _battery_interface = BatteryInterface::new(state.clone()); let _solar_interface = SolarInterface::new(state.clone()); let _ac_input_interface = AcInputInterface::new(state.clone()); let _ac_load_interface = AcLoadInterface::new(state.clone()); let _config_interface = ConfigInterface::new(state.clone()); // Test that interfaces can be created without panicking // If we get here, creation succeeded } #[test] fn test_property_change_detection() { // Test floating point comparison logic without async let value1: f64 = 50.0; let value2: f64 = 50.1; assert!((value1 - 50.0_f64).abs() < f64::EPSILON); assert!((value2 - 50.1_f64).abs() < f64::EPSILON); assert!((value1 - value2).abs() > f64::EPSILON); } #[tokio::test] async fn test_battery_state_mapping() { // Test the battery state mapping logic from main.rs let test_cases = vec![ (10.0, "charging"), (-10.0, "discharging"), (0.0, "idle"), (3.0, "idle"), // Below threshold (-3.0, "idle"), // Above negative threshold ]; for (power, expected_state) in test_cases { let direction = if power > 5.0 { "charging".to_string() } else if power < -5.0 { "discharging".to_string() } else { "idle".to_string() }; assert_eq!( direction, expected_state, "Power {power} should map to {expected_state}" ); } } }