use std::time::Duration; use anyhow::Result; use camper_widget_refresh::dbus::{ PropertyType, start_service_at, update_connected_status, update_single_property, }; use futures_util::StreamExt; use tokio::time::timeout; use zbus::{Connection, connection, proxy}; const TIMEOUT: Duration = Duration::from_secs(2); // ---------- Client-side proxy definitions ---------- #[proxy( interface = "org.craftknight.CamperWidget.Status", default_service = "org.craftknight.CamperWidget", default_path = "/org/craftknight/camper_widget" )] trait Status { #[zbus(property)] fn connected(&self) -> zbus::Result; } #[proxy( interface = "org.craftknight.CamperWidget.Battery", default_service = "org.craftknight.CamperWidget", default_path = "/org/craftknight/camper_widget/Battery" )] trait Battery { #[zbus(property)] fn soc(&self) -> zbus::Result; #[zbus(property)] fn power(&self) -> zbus::Result; #[zbus(property)] fn state(&self) -> zbus::Result; } #[proxy( interface = "org.craftknight.CamperWidget.Solar", default_service = "org.craftknight.CamperWidget", default_path = "/org/craftknight/camper_widget/Solar" )] trait Solar { #[zbus(property)] fn power(&self) -> zbus::Result; } #[proxy( interface = "org.craftknight.CamperWidget.AcInput", default_service = "org.craftknight.CamperWidget", default_path = "/org/craftknight/camper_widget/AcInput" )] trait AcInput { #[zbus(property)] fn power(&self) -> zbus::Result; } #[proxy( interface = "org.craftknight.CamperWidget.AcLoad", default_service = "org.craftknight.CamperWidget", default_path = "/org/craftknight/camper_widget/AcLoad" )] trait AcLoad { #[zbus(property)] fn power(&self) -> zbus::Result; } #[proxy( interface = "org.craftknight.CamperWidget.Config", default_service = "org.craftknight.CamperWidget", default_path = "/org/craftknight/camper_widget" )] trait Config { fn reload(&self) -> zbus::Result<()>; } // ---------- Test fixture ---------- struct DbusTestFixture { _daemon: dbus_launch::Daemon, address: String, } impl DbusTestFixture { fn new() -> Self { let daemon = dbus_launch::Launcher::daemon() .launch() .expect("failed to launch dbus-daemon — is dbus-daemon installed?"); let address = daemon.address().to_string(); Self { _daemon: daemon, address, } } fn address(&self) -> &str { &self.address } async fn client_connection(&self) -> Result { let conn = connection::Builder::address(self.address())? .build() .await?; Ok(conn) } } // ---------- Tests ---------- #[tokio::test] async fn test_default_property_values() -> Result<()> { let fixture = DbusTestFixture::new(); let (_conn, _state) = start_service_at(fixture.address()).await?; let client = fixture.client_connection().await?; let status = StatusProxy::new(&client).await?; let battery = BatteryProxy::new(&client).await?; let solar = SolarProxy::new(&client).await?; let ac_input = AcInputProxy::new(&client).await?; let ac_load = AcLoadProxy::new(&client).await?; assert!(!status.connected().await?); assert_eq!(battery.soc().await?, 0.0); assert_eq!(battery.power().await?, 0.0); assert_eq!(battery.state().await?, "idle"); assert_eq!(solar.power().await?, 0.0); assert_eq!(ac_input.power().await?, 0.0); assert_eq!(ac_load.power().await?, 0.0); Ok(()) } #[tokio::test] async fn test_connected_status_update() -> Result<()> { let fixture = DbusTestFixture::new(); let (conn, state) = start_service_at(fixture.address()).await?; let client = fixture.client_connection().await?; let status = StatusProxy::new(&client).await?; assert!(!status.connected().await?); // Subscribe to changes before mutating let mut stream = status.receive_connected_changed().await; update_connected_status(&conn, &state, true).await?; // Wait for a property-changed signal with the expected value. // The proxy may deliver a cached/initial signal first, so loop until // we see the value we expect or timeout. let deadline = tokio::time::Instant::now() + TIMEOUT; loop { let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); if remaining.is_zero() { panic!("timed out waiting for connected=true signal"); } let signal = timeout(remaining, stream.next()).await?; let changed = signal.expect("stream ended unexpectedly"); if changed.get().await? { break; // got the expected value } } Ok(()) } #[tokio::test] async fn test_battery_soc_update() -> Result<()> { let fixture = DbusTestFixture::new(); let (conn, state) = start_service_at(fixture.address()).await?; let client = fixture.client_connection().await?; let battery = BatteryProxy::new(&client).await?; let mut stream = battery.receive_soc_changed().await; update_single_property(&conn, &state, &PropertyType::BatterySoc, 85.5).await?; let signal = timeout(TIMEOUT, stream.next()).await?; assert!(signal.is_some()); assert!((signal.unwrap().get().await? - 85.5).abs() < f64::EPSILON); Ok(()) } #[tokio::test] async fn test_battery_power_and_state_derivation() -> Result<()> { let fixture = DbusTestFixture::new(); let (conn, state) = start_service_at(fixture.address()).await?; let client = fixture.client_connection().await?; let battery = BatteryProxy::new(&client).await?; let mut state_stream = battery.receive_state_changed().await; // Positive power above threshold -> charging update_single_property(&conn, &state, &PropertyType::BatteryPower, 12.0).await?; update_single_property(&conn, &state, &PropertyType::BatteryState, 12.0).await?; let signal = timeout(TIMEOUT, state_stream.next()).await?; assert!(signal.is_some()); assert_eq!(signal.unwrap().get().await?, "charging"); // Negative power below threshold -> discharging update_single_property(&conn, &state, &PropertyType::BatteryPower, -12.0).await?; update_single_property(&conn, &state, &PropertyType::BatteryState, -12.0).await?; let signal = timeout(TIMEOUT, state_stream.next()).await?; assert!(signal.is_some()); assert_eq!(signal.unwrap().get().await?, "discharging"); // Small power within threshold -> idle update_single_property(&conn, &state, &PropertyType::BatteryPower, 3.0).await?; update_single_property(&conn, &state, &PropertyType::BatteryState, 3.0).await?; let signal = timeout(TIMEOUT, state_stream.next()).await?; assert!(signal.is_some()); assert_eq!(signal.unwrap().get().await?, "idle"); Ok(()) } #[tokio::test] async fn test_solar_power_update() -> Result<()> { let fixture = DbusTestFixture::new(); let (conn, state) = start_service_at(fixture.address()).await?; let client = fixture.client_connection().await?; let solar = SolarProxy::new(&client).await?; let mut stream = solar.receive_power_changed().await; update_single_property(&conn, &state, &PropertyType::SolarPower, 250.0).await?; let signal = timeout(TIMEOUT, stream.next()).await?; assert!(signal.is_some()); assert!((signal.unwrap().get().await? - 250.0).abs() < f64::EPSILON); Ok(()) } #[tokio::test] async fn test_ac_input_power_update() -> Result<()> { let fixture = DbusTestFixture::new(); let (conn, state) = start_service_at(fixture.address()).await?; let client = fixture.client_connection().await?; let ac_input = AcInputProxy::new(&client).await?; let mut stream = ac_input.receive_power_changed().await; update_single_property(&conn, &state, &PropertyType::AcInputPower, 82.0).await?; let signal = timeout(TIMEOUT, stream.next()).await?; assert!(signal.is_some()); assert!((signal.unwrap().get().await? - 82.0).abs() < f64::EPSILON); Ok(()) } #[tokio::test] async fn test_ac_load_power_update() -> Result<()> { let fixture = DbusTestFixture::new(); let (conn, state) = start_service_at(fixture.address()).await?; let client = fixture.client_connection().await?; let ac_load = AcLoadProxy::new(&client).await?; let mut stream = ac_load.receive_power_changed().await; update_single_property(&conn, &state, &PropertyType::AcLoadPower, 77.0).await?; let signal = timeout(TIMEOUT, stream.next()).await?; assert!(signal.is_some()); assert!((signal.unwrap().get().await? - 77.0).abs() < f64::EPSILON); Ok(()) } #[tokio::test] async fn test_config_reload_method() -> Result<()> { let fixture = DbusTestFixture::new(); let (_conn, state) = start_service_at(fixture.address()).await?; let client = fixture.client_connection().await?; let config = ConfigProxy::new(&client).await?; // Spawn a listener for the notify signal let notify = state.config_reload_notify.clone(); let listener = tokio::spawn(async move { notify.notified().await }); // Call reload via D-Bus config.reload().await?; // The listener should complete within the timeout timeout(TIMEOUT, listener).await??; Ok(()) } #[tokio::test] async fn test_property_change_signal_emitted() -> Result<()> { let fixture = DbusTestFixture::new(); let (conn, state) = start_service_at(fixture.address()).await?; let client = fixture.client_connection().await?; let battery = BatteryProxy::new(&client).await?; // Subscribe to soc property changes before making updates let mut stream = battery.receive_soc_changed().await; // Update the property update_single_property(&conn, &state, &PropertyType::BatterySoc, 50.0).await?; // Should receive the change signal let signal = timeout(TIMEOUT, stream.next()).await?; assert!(signal.is_some()); let changed = signal.unwrap(); assert!((changed.get().await? - 50.0).abs() < f64::EPSILON); Ok(()) }