diff options
Diffstat (limited to 'service/src/dbus.rs')
| -rw-r--r-- | service/src/dbus.rs | 667 |
1 files changed, 667 insertions, 0 deletions
diff --git a/service/src/dbus.rs b/service/src/dbus.rs new file mode 100644 index 0000000..2a07134 --- /dev/null +++ b/service/src/dbus.rs @@ -0,0 +1,667 @@ +use std::sync::Arc; + +use anyhow::Result; +use tokio::sync::{Notify, RwLock, Semaphore}; + +use tracing::{error, warn}; +use zbus::{Connection, connection, interface}; + +// Object paths +pub const ROOT_OBJECT_PATH: &str = "/org/craftknight/camper_widget"; +pub const BATTERY_OBJECT_PATH: &str = "/org/craftknight/camper_widget/Battery"; +pub const SOLAR_OBJECT_PATH: &str = "/org/craftknight/camper_widget/Solar"; +pub const AC_INPUT_OBJECT_PATH: &str = "/org/craftknight/camper_widget/AcInput"; +pub const AC_LOAD_OBJECT_PATH: &str = "/org/craftknight/camper_widget/AcLoad"; + +#[derive(Clone)] +pub struct SharedState { + // Status interface data + pub connected: Arc<RwLock<bool>>, + + // Battery interface data + pub battery_soc: Arc<RwLock<f64>>, + pub battery_power: Arc<RwLock<f64>>, + pub battery_state: Arc<RwLock<String>>, + + // Solar interface data + pub solar_power: Arc<RwLock<f64>>, + + // AC input interface data + pub ac_input_power: Arc<RwLock<f64>>, + + // AC load interface data + pub ac_load_power: Arc<RwLock<f64>>, + + // Config reload notification + pub config_reload_notify: Arc<Notify>, + + // Backpressure for D-Bus signal emission + pub signal_semaphore: Arc<Semaphore>, +} + +impl Default for SharedState { + fn default() -> Self { + Self { + connected: Arc::new(RwLock::new(false)), + battery_soc: Arc::new(RwLock::new(0.0)), + battery_power: Arc::new(RwLock::new(0.0)), + battery_state: Arc::new(RwLock::new("idle".to_string())), + solar_power: Arc::new(RwLock::new(0.0)), + ac_input_power: Arc::new(RwLock::new(0.0)), + ac_load_power: Arc::new(RwLock::new(0.0)), + config_reload_notify: Arc::new(Notify::new()), + signal_semaphore: Arc::new(Semaphore::new(10)), + } + } +} + +#[derive(Clone)] +pub struct StatusInterface { + state: SharedState, +} + +impl StatusInterface { + fn new(state: SharedState) -> Self { + Self { state } + } +} + +#[derive(Clone)] +pub struct BatteryInterface { + state: SharedState, +} + +impl BatteryInterface { + fn new(state: SharedState) -> Self { + Self { state } + } +} + +#[derive(Clone)] +pub struct SolarInterface { + state: SharedState, +} + +impl SolarInterface { + fn new(state: SharedState) -> Self { + Self { state } + } +} + +#[derive(Clone)] +pub struct AcInputInterface { + state: SharedState, +} + +impl AcInputInterface { + fn new(state: SharedState) -> Self { + Self { state } + } +} + +#[derive(Clone)] +pub struct AcLoadInterface { + state: SharedState, +} + +impl AcLoadInterface { + fn new(state: SharedState) -> Self { + Self { state } + } +} + +#[derive(Clone)] +pub struct ConfigInterface { + #[allow(dead_code)] + state: SharedState, +} + +impl ConfigInterface { + fn new(state: SharedState) -> Self { + Self { state } + } +} + +// Status interface - Connected property +#[interface(name = "org.craftknight.CamperWidget.Status")] +impl StatusInterface { + #[zbus(property(emits_changed_signal = "true"))] + async fn connected(&self) -> bool { + *self.state.connected.read().await + } +} + +// Battery interface - SoC, Power, State properties +#[interface(name = "org.craftknight.CamperWidget.Battery")] +impl BatteryInterface { + #[zbus(property(emits_changed_signal = "true"))] + async fn soc(&self) -> f64 { + *self.state.battery_soc.read().await + } + + #[zbus(property(emits_changed_signal = "true"))] + async fn power(&self) -> f64 { + *self.state.battery_power.read().await + } + + #[zbus(property(emits_changed_signal = "true"))] + async fn state(&self) -> String { + self.state.battery_state.read().await.clone() + } +} + +// Solar interface - Power property +#[interface(name = "org.craftknight.CamperWidget.Solar")] +impl SolarInterface { + #[zbus(property(emits_changed_signal = "true"))] + async fn power(&self) -> f64 { + *self.state.solar_power.read().await + } +} + +// AC input interface - Power property +#[interface(name = "org.craftknight.CamperWidget.AcInput")] +impl AcInputInterface { + #[zbus(property(emits_changed_signal = "true"))] + async fn power(&self) -> f64 { + *self.state.ac_input_power.read().await + } +} + +// AC load interface - Power property +#[interface(name = "org.craftknight.CamperWidget.AcLoad")] +impl AcLoadInterface { + #[zbus(property(emits_changed_signal = "true"))] + async fn power(&self) -> f64 { + *self.state.ac_load_power.read().await + } +} + +// Config interface for handling configuration reloads +#[interface(name = "org.craftknight.CamperWidget.Config")] +impl ConfigInterface { + /// Reload configuration method + async fn reload(&self) -> zbus::fdo::Result<()> { + self.state.config_reload_notify.notify_one(); + Ok(()) + } +} + +// Start a session bus service and return the connection; keep it alive in main +pub async fn start_service() -> Result<(Connection, SharedState)> { + let shared = SharedState::default(); + + let status_server = StatusInterface::new(shared.clone()); + let battery_server = BatteryInterface::new(shared.clone()); + let solar_server = SolarInterface::new(shared.clone()); + let ac_input_server = AcInputInterface::new(shared.clone()); + let ac_load_server = AcLoadInterface::new(shared.clone()); + let config_server = ConfigInterface::new(shared.clone()); + + let conn = connection::Builder::session()? + .name("org.craftknight.CamperWidget")? + .serve_at(ROOT_OBJECT_PATH, status_server)? + .serve_at(ROOT_OBJECT_PATH, config_server)? + .serve_at(BATTERY_OBJECT_PATH, battery_server)? + .serve_at(SOLAR_OBJECT_PATH, solar_server)? + .serve_at(AC_INPUT_OBJECT_PATH, ac_input_server)? + .serve_at(AC_LOAD_OBJECT_PATH, ac_load_server)? + .build() + .await?; + + Ok((conn, shared)) +} + +/// Start a D-Bus service at a custom address (for integration testing). +pub async fn start_service_at(address: &str) -> Result<(Connection, SharedState)> { + let shared = SharedState::default(); + + let status_server = StatusInterface::new(shared.clone()); + let battery_server = BatteryInterface::new(shared.clone()); + let solar_server = SolarInterface::new(shared.clone()); + let ac_input_server = AcInputInterface::new(shared.clone()); + let ac_load_server = AcLoadInterface::new(shared.clone()); + let config_server = ConfigInterface::new(shared.clone()); + + let conn = connection::Builder::address(address)? + .name("org.craftknight.CamperWidget")? + .serve_at(ROOT_OBJECT_PATH, status_server)? + .serve_at(ROOT_OBJECT_PATH, config_server)? + .serve_at(BATTERY_OBJECT_PATH, battery_server)? + .serve_at(SOLAR_OBJECT_PATH, solar_server)? + .serve_at(AC_INPUT_OBJECT_PATH, ac_input_server)? + .serve_at(AC_LOAD_OBJECT_PATH, ac_load_server)? + .build() + .await?; + + Ok((conn, shared)) +} + +// Property types for individual updates +#[derive(Debug, Clone, PartialEq)] +pub enum PropertyType { + BatterySoc, + BatteryPower, + BatteryState, + SolarPower, + AcInputPower, + AcLoadPower, +} + +// Update connected status and emit signal +pub async fn update_connected_status( + conn: &Connection, + state: &SharedState, + connected: bool, +) -> Result<()> { + let object_server = conn.object_server(); + + let mut guard = state.connected.write().await; + if *guard != connected { + *guard = connected; + // Emit signal for connected status asynchronously + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, StatusInterface>(ROOT_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = iface.connected_changed(iface_ref.signal_emitter()).await { + error!("Connected: Failed to emit signal: {}", e); + } + } else { + error!("Connected: Failed to get interface at {}", ROOT_OBJECT_PATH); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for Connected"); + } + } + } + + Ok(()) +} + +// Update battery state from a string directly (used by Batteries topic) +pub async fn update_battery_state_str( + conn: &Connection, + state: &SharedState, + state_str: &str, +) -> Result<()> { + let object_server = conn.object_server(); + let mut guard = state.battery_state.write().await; + if *guard != state_str { + *guard = state_str.to_string(); + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = iface.state_changed(iface_ref.signal_emitter()).await { + error!("BatteryState: Failed to emit signal: {}", e); + } + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for BatteryState"); + } + } + } + Ok(()) +} + +// Update a single DBus property immediately +pub async fn update_single_property( + conn: &Connection, + state: &SharedState, + property_type: &PropertyType, + value: f64, +) -> Result<()> { + let object_server = conn.object_server(); + + match property_type { + PropertyType::BatterySoc => { + let mut guard = state.battery_soc.write().await; + if (*guard - value).abs() > f64::EPSILON { + *guard = value; + // Emit signal for battery SoC asynchronously + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = iface.soc_changed(iface_ref.signal_emitter()).await + { + error!("BatterySoc: Failed to emit signal: {}", e); + } + } else { + error!( + "BatterySoc: Failed to get interface at {}", + BATTERY_OBJECT_PATH + ); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for BatterySoc"); + } + } + } + } + PropertyType::BatteryPower => { + let mut guard = state.battery_power.write().await; + if (*guard - value).abs() > f64::EPSILON { + *guard = value; + // Emit signal for battery power asynchronously + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = + iface.power_changed(iface_ref.signal_emitter()).await + { + error!("BatteryPower: Failed to emit signal: {}", e); + } + } else { + error!( + "BatteryPower: Failed to get interface at {}", + BATTERY_OBJECT_PATH + ); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for BatteryPower"); + } + } + } + } + PropertyType::BatteryState => { + let state_str = if value > 5.0 { + "charging".to_string() + } else if value < -5.0 { + "discharging".to_string() + } else { + "idle".to_string() + }; + + let mut guard = state.battery_state.write().await; + if *guard != state_str { + *guard = state_str.clone(); + // Emit signal for battery state asynchronously + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = + iface.state_changed(iface_ref.signal_emitter()).await + { + error!("BatteryState: Failed to emit signal: {}", e); + } + } else { + error!( + "BatteryState: Failed to get interface at {}", + BATTERY_OBJECT_PATH + ); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for BatteryState"); + } + } + } + } + PropertyType::SolarPower => { + let mut guard = state.solar_power.write().await; + if (*guard - value).abs() > f64::EPSILON { + *guard = value; + // Emit signal for solar power asynchronously + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, SolarInterface>(SOLAR_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = + iface.power_changed(iface_ref.signal_emitter()).await + { + error!("SolarPower: Failed to emit signal: {}", e); + } + } else { + error!( + "SolarPower: Failed to get interface at {}", + SOLAR_OBJECT_PATH + ); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for SolarPower"); + } + } + } + } + PropertyType::AcInputPower => { + let mut guard = state.ac_input_power.write().await; + if (*guard - value).abs() > f64::EPSILON { + *guard = value; + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, AcInputInterface>(AC_INPUT_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = + iface.power_changed(iface_ref.signal_emitter()).await + { + error!("AcInputPower: Failed to emit signal: {}", e); + } + } else { + error!( + "AcInputPower: Failed to get interface at {}", + AC_INPUT_OBJECT_PATH + ); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for AcInputPower"); + } + } + } + } + PropertyType::AcLoadPower => { + let mut guard = state.ac_load_power.write().await; + if (*guard - value).abs() > f64::EPSILON { + *guard = value; + let object_server_clone = object_server.clone(); + match state.signal_semaphore.clone().try_acquire_owned() { + Ok(permit) => { + tokio::spawn(async move { + let _permit = permit; + if let Ok(iface_ref) = object_server_clone + .interface::<_, AcLoadInterface>(AC_LOAD_OBJECT_PATH) + .await + { + let iface = iface_ref.get_mut().await; + if let Err(e) = + iface.power_changed(iface_ref.signal_emitter()).await + { + error!("AcLoadPower: Failed to emit signal: {}", e); + } + } else { + error!( + "AcLoadPower: Failed to get interface at {}", + AC_LOAD_OBJECT_PATH + ); + } + }); + } + Err(_) => { + warn!("Signal emission backpressure: dropping signal for AcLoadPower"); + } + } + } + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_shared_state_initialization() { + let state = SharedState::default(); + assert!(!(*state.connected.read().await)); + assert_eq!(*state.battery_soc.read().await, 0.0); + assert_eq!(*state.battery_power.read().await, 0.0); + assert_eq!(*state.battery_state.read().await, "idle"); + assert_eq!(*state.solar_power.read().await, 0.0); + assert_eq!(*state.ac_input_power.read().await, 0.0); + assert_eq!(*state.ac_load_power.read().await, 0.0); + } + + #[tokio::test] + async fn test_property_updates() { + let state = SharedState::default(); + + // Test connected status update + { + let mut guard = state.connected.write().await; + *guard = true; + } + assert!(*state.connected.read().await); + + // Test battery property updates + { + let mut soc_guard = state.battery_soc.write().await; + *soc_guard = 75.0; + } + { + let mut power_guard = state.battery_power.write().await; + *power_guard = -15.0; + } + { + let mut state_guard = state.battery_state.write().await; + *state_guard = "discharging".to_string(); + } + + assert_eq!(*state.battery_soc.read().await, 75.0); + assert_eq!(*state.battery_power.read().await, -15.0); + assert_eq!(*state.battery_state.read().await, "discharging"); + + // Test solar power update + { + let mut solar_guard = state.solar_power.write().await; + *solar_guard = 120.0; + } + assert_eq!(*state.solar_power.read().await, 120.0); + + // Test AC input power update + { + let mut ac_input_guard = state.ac_input_power.write().await; + *ac_input_guard = 82.0; + } + assert_eq!(*state.ac_input_power.read().await, 82.0); + + // Test AC load power update + { + let mut ac_load_guard = state.ac_load_power.write().await; + *ac_load_guard = 77.0; + } + assert_eq!(*state.ac_load_power.read().await, 77.0); + } + + #[tokio::test] + async fn test_interface_creation() { + let state = SharedState::default(); + + let _status_interface = StatusInterface::new(state.clone()); + let _battery_interface = BatteryInterface::new(state.clone()); + let _solar_interface = SolarInterface::new(state.clone()); + let _ac_input_interface = AcInputInterface::new(state.clone()); + let _ac_load_interface = AcLoadInterface::new(state.clone()); + let _config_interface = ConfigInterface::new(state.clone()); + + // Test that interfaces can be created without panicking + // If we get here, creation succeeded + } + + #[test] + fn test_property_change_detection() { + // Test floating point comparison logic without async + let value1: f64 = 50.0; + let value2: f64 = 50.1; + + assert!((value1 - 50.0_f64).abs() < f64::EPSILON); + assert!((value2 - 50.1_f64).abs() < f64::EPSILON); + assert!((value1 - value2).abs() > f64::EPSILON); + } + + #[tokio::test] + async fn test_battery_state_mapping() { + // Test the battery state mapping logic from main.rs + let test_cases = vec![ + (10.0, "charging"), + (-10.0, "discharging"), + (0.0, "idle"), + (3.0, "idle"), // Below threshold + (-3.0, "idle"), // Above negative threshold + ]; + + for (power, expected_state) in test_cases { + let direction = if power > 5.0 { + "charging".to_string() + } else if power < -5.0 { + "discharging".to_string() + } else { + "idle".to_string() + }; + + assert_eq!( + direction, expected_state, + "Power {power} should map to {expected_state}" + ); + } + } +} |
