summaryrefslogtreecommitdiff
path: root/service/src/dbus.rs
diff options
context:
space:
mode:
Diffstat (limited to 'service/src/dbus.rs')
-rw-r--r--service/src/dbus.rs667
1 files changed, 667 insertions, 0 deletions
diff --git a/service/src/dbus.rs b/service/src/dbus.rs
new file mode 100644
index 0000000..2a07134
--- /dev/null
+++ b/service/src/dbus.rs
@@ -0,0 +1,667 @@
+use std::sync::Arc;
+
+use anyhow::Result;
+use tokio::sync::{Notify, RwLock, Semaphore};
+
+use tracing::{error, warn};
+use zbus::{Connection, connection, interface};
+
+// Object paths
+pub const ROOT_OBJECT_PATH: &str = "/org/craftknight/camper_widget";
+pub const BATTERY_OBJECT_PATH: &str = "/org/craftknight/camper_widget/Battery";
+pub const SOLAR_OBJECT_PATH: &str = "/org/craftknight/camper_widget/Solar";
+pub const AC_INPUT_OBJECT_PATH: &str = "/org/craftknight/camper_widget/AcInput";
+pub const AC_LOAD_OBJECT_PATH: &str = "/org/craftknight/camper_widget/AcLoad";
+
+#[derive(Clone)]
+pub struct SharedState {
+ // Status interface data
+ pub connected: Arc<RwLock<bool>>,
+
+ // Battery interface data
+ pub battery_soc: Arc<RwLock<f64>>,
+ pub battery_power: Arc<RwLock<f64>>,
+ pub battery_state: Arc<RwLock<String>>,
+
+ // Solar interface data
+ pub solar_power: Arc<RwLock<f64>>,
+
+ // AC input interface data
+ pub ac_input_power: Arc<RwLock<f64>>,
+
+ // AC load interface data
+ pub ac_load_power: Arc<RwLock<f64>>,
+
+ // Config reload notification
+ pub config_reload_notify: Arc<Notify>,
+
+ // Backpressure for D-Bus signal emission
+ pub signal_semaphore: Arc<Semaphore>,
+}
+
+impl Default for SharedState {
+ fn default() -> Self {
+ Self {
+ connected: Arc::new(RwLock::new(false)),
+ battery_soc: Arc::new(RwLock::new(0.0)),
+ battery_power: Arc::new(RwLock::new(0.0)),
+ battery_state: Arc::new(RwLock::new("idle".to_string())),
+ solar_power: Arc::new(RwLock::new(0.0)),
+ ac_input_power: Arc::new(RwLock::new(0.0)),
+ ac_load_power: Arc::new(RwLock::new(0.0)),
+ config_reload_notify: Arc::new(Notify::new()),
+ signal_semaphore: Arc::new(Semaphore::new(10)),
+ }
+ }
+}
+
+#[derive(Clone)]
+pub struct StatusInterface {
+ state: SharedState,
+}
+
+impl StatusInterface {
+ fn new(state: SharedState) -> Self {
+ Self { state }
+ }
+}
+
+#[derive(Clone)]
+pub struct BatteryInterface {
+ state: SharedState,
+}
+
+impl BatteryInterface {
+ fn new(state: SharedState) -> Self {
+ Self { state }
+ }
+}
+
+#[derive(Clone)]
+pub struct SolarInterface {
+ state: SharedState,
+}
+
+impl SolarInterface {
+ fn new(state: SharedState) -> Self {
+ Self { state }
+ }
+}
+
+#[derive(Clone)]
+pub struct AcInputInterface {
+ state: SharedState,
+}
+
+impl AcInputInterface {
+ fn new(state: SharedState) -> Self {
+ Self { state }
+ }
+}
+
+#[derive(Clone)]
+pub struct AcLoadInterface {
+ state: SharedState,
+}
+
+impl AcLoadInterface {
+ fn new(state: SharedState) -> Self {
+ Self { state }
+ }
+}
+
+#[derive(Clone)]
+pub struct ConfigInterface {
+ #[allow(dead_code)]
+ state: SharedState,
+}
+
+impl ConfigInterface {
+ fn new(state: SharedState) -> Self {
+ Self { state }
+ }
+}
+
+// Status interface - Connected property
+#[interface(name = "org.craftknight.CamperWidget.Status")]
+impl StatusInterface {
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn connected(&self) -> bool {
+ *self.state.connected.read().await
+ }
+}
+
+// Battery interface - SoC, Power, State properties
+#[interface(name = "org.craftknight.CamperWidget.Battery")]
+impl BatteryInterface {
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn soc(&self) -> f64 {
+ *self.state.battery_soc.read().await
+ }
+
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn power(&self) -> f64 {
+ *self.state.battery_power.read().await
+ }
+
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn state(&self) -> String {
+ self.state.battery_state.read().await.clone()
+ }
+}
+
+// Solar interface - Power property
+#[interface(name = "org.craftknight.CamperWidget.Solar")]
+impl SolarInterface {
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn power(&self) -> f64 {
+ *self.state.solar_power.read().await
+ }
+}
+
+// AC input interface - Power property
+#[interface(name = "org.craftknight.CamperWidget.AcInput")]
+impl AcInputInterface {
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn power(&self) -> f64 {
+ *self.state.ac_input_power.read().await
+ }
+}
+
+// AC load interface - Power property
+#[interface(name = "org.craftknight.CamperWidget.AcLoad")]
+impl AcLoadInterface {
+ #[zbus(property(emits_changed_signal = "true"))]
+ async fn power(&self) -> f64 {
+ *self.state.ac_load_power.read().await
+ }
+}
+
+// Config interface for handling configuration reloads
+#[interface(name = "org.craftknight.CamperWidget.Config")]
+impl ConfigInterface {
+ /// Reload configuration method
+ async fn reload(&self) -> zbus::fdo::Result<()> {
+ self.state.config_reload_notify.notify_one();
+ Ok(())
+ }
+}
+
+// Start a session bus service and return the connection; keep it alive in main
+pub async fn start_service() -> Result<(Connection, SharedState)> {
+ let shared = SharedState::default();
+
+ let status_server = StatusInterface::new(shared.clone());
+ let battery_server = BatteryInterface::new(shared.clone());
+ let solar_server = SolarInterface::new(shared.clone());
+ let ac_input_server = AcInputInterface::new(shared.clone());
+ let ac_load_server = AcLoadInterface::new(shared.clone());
+ let config_server = ConfigInterface::new(shared.clone());
+
+ let conn = connection::Builder::session()?
+ .name("org.craftknight.CamperWidget")?
+ .serve_at(ROOT_OBJECT_PATH, status_server)?
+ .serve_at(ROOT_OBJECT_PATH, config_server)?
+ .serve_at(BATTERY_OBJECT_PATH, battery_server)?
+ .serve_at(SOLAR_OBJECT_PATH, solar_server)?
+ .serve_at(AC_INPUT_OBJECT_PATH, ac_input_server)?
+ .serve_at(AC_LOAD_OBJECT_PATH, ac_load_server)?
+ .build()
+ .await?;
+
+ Ok((conn, shared))
+}
+
+/// Start a D-Bus service at a custom address (for integration testing).
+pub async fn start_service_at(address: &str) -> Result<(Connection, SharedState)> {
+ let shared = SharedState::default();
+
+ let status_server = StatusInterface::new(shared.clone());
+ let battery_server = BatteryInterface::new(shared.clone());
+ let solar_server = SolarInterface::new(shared.clone());
+ let ac_input_server = AcInputInterface::new(shared.clone());
+ let ac_load_server = AcLoadInterface::new(shared.clone());
+ let config_server = ConfigInterface::new(shared.clone());
+
+ let conn = connection::Builder::address(address)?
+ .name("org.craftknight.CamperWidget")?
+ .serve_at(ROOT_OBJECT_PATH, status_server)?
+ .serve_at(ROOT_OBJECT_PATH, config_server)?
+ .serve_at(BATTERY_OBJECT_PATH, battery_server)?
+ .serve_at(SOLAR_OBJECT_PATH, solar_server)?
+ .serve_at(AC_INPUT_OBJECT_PATH, ac_input_server)?
+ .serve_at(AC_LOAD_OBJECT_PATH, ac_load_server)?
+ .build()
+ .await?;
+
+ Ok((conn, shared))
+}
+
+// Property types for individual updates
+#[derive(Debug, Clone, PartialEq)]
+pub enum PropertyType {
+ BatterySoc,
+ BatteryPower,
+ BatteryState,
+ SolarPower,
+ AcInputPower,
+ AcLoadPower,
+}
+
+// Update connected status and emit signal
+pub async fn update_connected_status(
+ conn: &Connection,
+ state: &SharedState,
+ connected: bool,
+) -> Result<()> {
+ let object_server = conn.object_server();
+
+ let mut guard = state.connected.write().await;
+ if *guard != connected {
+ *guard = connected;
+ // Emit signal for connected status asynchronously
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, StatusInterface>(ROOT_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) = iface.connected_changed(iface_ref.signal_emitter()).await {
+ error!("Connected: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!("Connected: Failed to get interface at {}", ROOT_OBJECT_PATH);
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for Connected");
+ }
+ }
+ }
+
+ Ok(())
+}
+
+// Update battery state from a string directly (used by Batteries topic)
+pub async fn update_battery_state_str(
+ conn: &Connection,
+ state: &SharedState,
+ state_str: &str,
+) -> Result<()> {
+ let object_server = conn.object_server();
+ let mut guard = state.battery_state.write().await;
+ if *guard != state_str {
+ *guard = state_str.to_string();
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) = iface.state_changed(iface_ref.signal_emitter()).await {
+ error!("BatteryState: Failed to emit signal: {}", e);
+ }
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for BatteryState");
+ }
+ }
+ }
+ Ok(())
+}
+
+// Update a single DBus property immediately
+pub async fn update_single_property(
+ conn: &Connection,
+ state: &SharedState,
+ property_type: &PropertyType,
+ value: f64,
+) -> Result<()> {
+ let object_server = conn.object_server();
+
+ match property_type {
+ PropertyType::BatterySoc => {
+ let mut guard = state.battery_soc.write().await;
+ if (*guard - value).abs() > f64::EPSILON {
+ *guard = value;
+ // Emit signal for battery SoC asynchronously
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) = iface.soc_changed(iface_ref.signal_emitter()).await
+ {
+ error!("BatterySoc: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!(
+ "BatterySoc: Failed to get interface at {}",
+ BATTERY_OBJECT_PATH
+ );
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for BatterySoc");
+ }
+ }
+ }
+ }
+ PropertyType::BatteryPower => {
+ let mut guard = state.battery_power.write().await;
+ if (*guard - value).abs() > f64::EPSILON {
+ *guard = value;
+ // Emit signal for battery power asynchronously
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) =
+ iface.power_changed(iface_ref.signal_emitter()).await
+ {
+ error!("BatteryPower: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!(
+ "BatteryPower: Failed to get interface at {}",
+ BATTERY_OBJECT_PATH
+ );
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for BatteryPower");
+ }
+ }
+ }
+ }
+ PropertyType::BatteryState => {
+ let state_str = if value > 5.0 {
+ "charging".to_string()
+ } else if value < -5.0 {
+ "discharging".to_string()
+ } else {
+ "idle".to_string()
+ };
+
+ let mut guard = state.battery_state.write().await;
+ if *guard != state_str {
+ *guard = state_str.clone();
+ // Emit signal for battery state asynchronously
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, BatteryInterface>(BATTERY_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) =
+ iface.state_changed(iface_ref.signal_emitter()).await
+ {
+ error!("BatteryState: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!(
+ "BatteryState: Failed to get interface at {}",
+ BATTERY_OBJECT_PATH
+ );
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for BatteryState");
+ }
+ }
+ }
+ }
+ PropertyType::SolarPower => {
+ let mut guard = state.solar_power.write().await;
+ if (*guard - value).abs() > f64::EPSILON {
+ *guard = value;
+ // Emit signal for solar power asynchronously
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, SolarInterface>(SOLAR_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) =
+ iface.power_changed(iface_ref.signal_emitter()).await
+ {
+ error!("SolarPower: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!(
+ "SolarPower: Failed to get interface at {}",
+ SOLAR_OBJECT_PATH
+ );
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for SolarPower");
+ }
+ }
+ }
+ }
+ PropertyType::AcInputPower => {
+ let mut guard = state.ac_input_power.write().await;
+ if (*guard - value).abs() > f64::EPSILON {
+ *guard = value;
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, AcInputInterface>(AC_INPUT_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) =
+ iface.power_changed(iface_ref.signal_emitter()).await
+ {
+ error!("AcInputPower: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!(
+ "AcInputPower: Failed to get interface at {}",
+ AC_INPUT_OBJECT_PATH
+ );
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for AcInputPower");
+ }
+ }
+ }
+ }
+ PropertyType::AcLoadPower => {
+ let mut guard = state.ac_load_power.write().await;
+ if (*guard - value).abs() > f64::EPSILON {
+ *guard = value;
+ let object_server_clone = object_server.clone();
+ match state.signal_semaphore.clone().try_acquire_owned() {
+ Ok(permit) => {
+ tokio::spawn(async move {
+ let _permit = permit;
+ if let Ok(iface_ref) = object_server_clone
+ .interface::<_, AcLoadInterface>(AC_LOAD_OBJECT_PATH)
+ .await
+ {
+ let iface = iface_ref.get_mut().await;
+ if let Err(e) =
+ iface.power_changed(iface_ref.signal_emitter()).await
+ {
+ error!("AcLoadPower: Failed to emit signal: {}", e);
+ }
+ } else {
+ error!(
+ "AcLoadPower: Failed to get interface at {}",
+ AC_LOAD_OBJECT_PATH
+ );
+ }
+ });
+ }
+ Err(_) => {
+ warn!("Signal emission backpressure: dropping signal for AcLoadPower");
+ }
+ }
+ }
+ }
+ }
+
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[tokio::test]
+ async fn test_shared_state_initialization() {
+ let state = SharedState::default();
+ assert!(!(*state.connected.read().await));
+ assert_eq!(*state.battery_soc.read().await, 0.0);
+ assert_eq!(*state.battery_power.read().await, 0.0);
+ assert_eq!(*state.battery_state.read().await, "idle");
+ assert_eq!(*state.solar_power.read().await, 0.0);
+ assert_eq!(*state.ac_input_power.read().await, 0.0);
+ assert_eq!(*state.ac_load_power.read().await, 0.0);
+ }
+
+ #[tokio::test]
+ async fn test_property_updates() {
+ let state = SharedState::default();
+
+ // Test connected status update
+ {
+ let mut guard = state.connected.write().await;
+ *guard = true;
+ }
+ assert!(*state.connected.read().await);
+
+ // Test battery property updates
+ {
+ let mut soc_guard = state.battery_soc.write().await;
+ *soc_guard = 75.0;
+ }
+ {
+ let mut power_guard = state.battery_power.write().await;
+ *power_guard = -15.0;
+ }
+ {
+ let mut state_guard = state.battery_state.write().await;
+ *state_guard = "discharging".to_string();
+ }
+
+ assert_eq!(*state.battery_soc.read().await, 75.0);
+ assert_eq!(*state.battery_power.read().await, -15.0);
+ assert_eq!(*state.battery_state.read().await, "discharging");
+
+ // Test solar power update
+ {
+ let mut solar_guard = state.solar_power.write().await;
+ *solar_guard = 120.0;
+ }
+ assert_eq!(*state.solar_power.read().await, 120.0);
+
+ // Test AC input power update
+ {
+ let mut ac_input_guard = state.ac_input_power.write().await;
+ *ac_input_guard = 82.0;
+ }
+ assert_eq!(*state.ac_input_power.read().await, 82.0);
+
+ // Test AC load power update
+ {
+ let mut ac_load_guard = state.ac_load_power.write().await;
+ *ac_load_guard = 77.0;
+ }
+ assert_eq!(*state.ac_load_power.read().await, 77.0);
+ }
+
+ #[tokio::test]
+ async fn test_interface_creation() {
+ let state = SharedState::default();
+
+ let _status_interface = StatusInterface::new(state.clone());
+ let _battery_interface = BatteryInterface::new(state.clone());
+ let _solar_interface = SolarInterface::new(state.clone());
+ let _ac_input_interface = AcInputInterface::new(state.clone());
+ let _ac_load_interface = AcLoadInterface::new(state.clone());
+ let _config_interface = ConfigInterface::new(state.clone());
+
+ // Test that interfaces can be created without panicking
+ // If we get here, creation succeeded
+ }
+
+ #[test]
+ fn test_property_change_detection() {
+ // Test floating point comparison logic without async
+ let value1: f64 = 50.0;
+ let value2: f64 = 50.1;
+
+ assert!((value1 - 50.0_f64).abs() < f64::EPSILON);
+ assert!((value2 - 50.1_f64).abs() < f64::EPSILON);
+ assert!((value1 - value2).abs() > f64::EPSILON);
+ }
+
+ #[tokio::test]
+ async fn test_battery_state_mapping() {
+ // Test the battery state mapping logic from main.rs
+ let test_cases = vec![
+ (10.0, "charging"),
+ (-10.0, "discharging"),
+ (0.0, "idle"),
+ (3.0, "idle"), // Below threshold
+ (-3.0, "idle"), // Above negative threshold
+ ];
+
+ for (power, expected_state) in test_cases {
+ let direction = if power > 5.0 {
+ "charging".to_string()
+ } else if power < -5.0 {
+ "discharging".to_string()
+ } else {
+ "idle".to_string()
+ };
+
+ assert_eq!(
+ direction, expected_state,
+ "Power {power} should map to {expected_state}"
+ );
+ }
+ }
+}