diff options
Diffstat (limited to 'service/tests')
| -rw-r--r-- | service/tests/dbus_integration.rs | 325 | ||||
| -rw-r--r-- | service/tests/mqtt_to_dbus_e2e.rs | 641 |
2 files changed, 966 insertions, 0 deletions
diff --git a/service/tests/dbus_integration.rs b/service/tests/dbus_integration.rs new file mode 100644 index 0000000..53ac25b --- /dev/null +++ b/service/tests/dbus_integration.rs @@ -0,0 +1,325 @@ +use std::time::Duration; + +use anyhow::Result; +use camper_widget_refresh::dbus::{ + PropertyType, start_service_at, update_connected_status, update_single_property, +}; +use futures_util::StreamExt; +use tokio::time::timeout; +use zbus::{Connection, connection, proxy}; + +const TIMEOUT: Duration = Duration::from_secs(2); + +// ---------- Client-side proxy definitions ---------- + +#[proxy( + interface = "org.craftknight.CamperWidget.Status", + default_service = "org.craftknight.CamperWidget", + default_path = "/org/craftknight/camper_widget" +)] +trait Status { + #[zbus(property)] + fn connected(&self) -> zbus::Result<bool>; +} + +#[proxy( + interface = "org.craftknight.CamperWidget.Battery", + default_service = "org.craftknight.CamperWidget", + default_path = "/org/craftknight/camper_widget/Battery" +)] +trait Battery { + #[zbus(property)] + fn soc(&self) -> zbus::Result<f64>; + + #[zbus(property)] + fn power(&self) -> zbus::Result<f64>; + + #[zbus(property)] + fn state(&self) -> zbus::Result<String>; +} + +#[proxy( + interface = "org.craftknight.CamperWidget.Solar", + default_service = "org.craftknight.CamperWidget", + default_path = "/org/craftknight/camper_widget/Solar" +)] +trait Solar { + #[zbus(property)] + fn power(&self) -> zbus::Result<f64>; +} + +#[proxy( + interface = "org.craftknight.CamperWidget.AcInput", + default_service = "org.craftknight.CamperWidget", + default_path = "/org/craftknight/camper_widget/AcInput" +)] +trait AcInput { + #[zbus(property)] + fn power(&self) -> zbus::Result<f64>; +} + +#[proxy( + interface = "org.craftknight.CamperWidget.AcLoad", + default_service = "org.craftknight.CamperWidget", + default_path = "/org/craftknight/camper_widget/AcLoad" +)] +trait AcLoad { + #[zbus(property)] + fn power(&self) -> zbus::Result<f64>; +} + +#[proxy( + interface = "org.craftknight.CamperWidget.Config", + default_service = "org.craftknight.CamperWidget", + default_path = "/org/craftknight/camper_widget" +)] +trait Config { + fn reload(&self) -> zbus::Result<()>; +} + +// ---------- Test fixture ---------- + +struct DbusTestFixture { + _daemon: dbus_launch::Daemon, + address: String, +} + +impl DbusTestFixture { + fn new() -> Self { + let daemon = dbus_launch::Launcher::daemon() + .launch() + .expect("failed to launch dbus-daemon — is dbus-daemon installed?"); + let address = daemon.address().to_string(); + Self { + _daemon: daemon, + address, + } + } + + fn address(&self) -> &str { + &self.address + } + + async fn client_connection(&self) -> Result<Connection> { + let conn = connection::Builder::address(self.address())? + .build() + .await?; + Ok(conn) + } +} + +// ---------- Tests ---------- + +#[tokio::test] +async fn test_default_property_values() -> Result<()> { + let fixture = DbusTestFixture::new(); + let (_conn, _state) = start_service_at(fixture.address()).await?; + + let client = fixture.client_connection().await?; + let status = StatusProxy::new(&client).await?; + let battery = BatteryProxy::new(&client).await?; + let solar = SolarProxy::new(&client).await?; + let ac_input = AcInputProxy::new(&client).await?; + let ac_load = AcLoadProxy::new(&client).await?; + + assert!(!status.connected().await?); + assert_eq!(battery.soc().await?, 0.0); + assert_eq!(battery.power().await?, 0.0); + assert_eq!(battery.state().await?, "idle"); + assert_eq!(solar.power().await?, 0.0); + assert_eq!(ac_input.power().await?, 0.0); + assert_eq!(ac_load.power().await?, 0.0); + + Ok(()) +} + +#[tokio::test] +async fn test_connected_status_update() -> Result<()> { + let fixture = DbusTestFixture::new(); + let (conn, state) = start_service_at(fixture.address()).await?; + + let client = fixture.client_connection().await?; + let status = StatusProxy::new(&client).await?; + + assert!(!status.connected().await?); + + // Subscribe to changes before mutating + let mut stream = status.receive_connected_changed().await; + + update_connected_status(&conn, &state, true).await?; + + // Wait for a property-changed signal with the expected value. + // The proxy may deliver a cached/initial signal first, so loop until + // we see the value we expect or timeout. + let deadline = tokio::time::Instant::now() + TIMEOUT; + loop { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + panic!("timed out waiting for connected=true signal"); + } + let signal = timeout(remaining, stream.next()).await?; + let changed = signal.expect("stream ended unexpectedly"); + if changed.get().await? { + break; // got the expected value + } + } + + Ok(()) +} + +#[tokio::test] +async fn test_battery_soc_update() -> Result<()> { + let fixture = DbusTestFixture::new(); + let (conn, state) = start_service_at(fixture.address()).await?; + + let client = fixture.client_connection().await?; + let battery = BatteryProxy::new(&client).await?; + + let mut stream = battery.receive_soc_changed().await; + + update_single_property(&conn, &state, &PropertyType::BatterySoc, 85.5).await?; + + let signal = timeout(TIMEOUT, stream.next()).await?; + assert!(signal.is_some()); + assert!((signal.unwrap().get().await? - 85.5).abs() < f64::EPSILON); + + Ok(()) +} + +#[tokio::test] +async fn test_battery_power_and_state_derivation() -> Result<()> { + let fixture = DbusTestFixture::new(); + let (conn, state) = start_service_at(fixture.address()).await?; + + let client = fixture.client_connection().await?; + let battery = BatteryProxy::new(&client).await?; + + let mut state_stream = battery.receive_state_changed().await; + + // Positive power above threshold -> charging + update_single_property(&conn, &state, &PropertyType::BatteryPower, 12.0).await?; + update_single_property(&conn, &state, &PropertyType::BatteryState, 12.0).await?; + + let signal = timeout(TIMEOUT, state_stream.next()).await?; + assert!(signal.is_some()); + assert_eq!(signal.unwrap().get().await?, "charging"); + + // Negative power below threshold -> discharging + update_single_property(&conn, &state, &PropertyType::BatteryPower, -12.0).await?; + update_single_property(&conn, &state, &PropertyType::BatteryState, -12.0).await?; + + let signal = timeout(TIMEOUT, state_stream.next()).await?; + assert!(signal.is_some()); + assert_eq!(signal.unwrap().get().await?, "discharging"); + + // Small power within threshold -> idle + update_single_property(&conn, &state, &PropertyType::BatteryPower, 3.0).await?; + update_single_property(&conn, &state, &PropertyType::BatteryState, 3.0).await?; + + let signal = timeout(TIMEOUT, state_stream.next()).await?; + assert!(signal.is_some()); + assert_eq!(signal.unwrap().get().await?, "idle"); + + Ok(()) +} + +#[tokio::test] +async fn test_solar_power_update() -> Result<()> { + let fixture = DbusTestFixture::new(); + let (conn, state) = start_service_at(fixture.address()).await?; + + let client = fixture.client_connection().await?; + let solar = SolarProxy::new(&client).await?; + + let mut stream = solar.receive_power_changed().await; + + update_single_property(&conn, &state, &PropertyType::SolarPower, 250.0).await?; + + let signal = timeout(TIMEOUT, stream.next()).await?; + assert!(signal.is_some()); + assert!((signal.unwrap().get().await? - 250.0).abs() < f64::EPSILON); + + Ok(()) +} + +#[tokio::test] +async fn test_ac_input_power_update() -> Result<()> { + let fixture = DbusTestFixture::new(); + let (conn, state) = start_service_at(fixture.address()).await?; + + let client = fixture.client_connection().await?; + let ac_input = AcInputProxy::new(&client).await?; + + let mut stream = ac_input.receive_power_changed().await; + + update_single_property(&conn, &state, &PropertyType::AcInputPower, 82.0).await?; + + let signal = timeout(TIMEOUT, stream.next()).await?; + assert!(signal.is_some()); + assert!((signal.unwrap().get().await? - 82.0).abs() < f64::EPSILON); + + Ok(()) +} + +#[tokio::test] +async fn test_ac_load_power_update() -> Result<()> { + let fixture = DbusTestFixture::new(); + let (conn, state) = start_service_at(fixture.address()).await?; + + let client = fixture.client_connection().await?; + let ac_load = AcLoadProxy::new(&client).await?; + + let mut stream = ac_load.receive_power_changed().await; + + update_single_property(&conn, &state, &PropertyType::AcLoadPower, 77.0).await?; + + let signal = timeout(TIMEOUT, stream.next()).await?; + assert!(signal.is_some()); + assert!((signal.unwrap().get().await? - 77.0).abs() < f64::EPSILON); + + Ok(()) +} + +#[tokio::test] +async fn test_config_reload_method() -> Result<()> { + let fixture = DbusTestFixture::new(); + let (_conn, state) = start_service_at(fixture.address()).await?; + + let client = fixture.client_connection().await?; + let config = ConfigProxy::new(&client).await?; + + // Spawn a listener for the notify signal + let notify = state.config_reload_notify.clone(); + let listener = tokio::spawn(async move { notify.notified().await }); + + // Call reload via D-Bus + config.reload().await?; + + // The listener should complete within the timeout + timeout(TIMEOUT, listener).await??; + + Ok(()) +} + +#[tokio::test] +async fn test_property_change_signal_emitted() -> Result<()> { + let fixture = DbusTestFixture::new(); + let (conn, state) = start_service_at(fixture.address()).await?; + + let client = fixture.client_connection().await?; + let battery = BatteryProxy::new(&client).await?; + + // Subscribe to soc property changes before making updates + let mut stream = battery.receive_soc_changed().await; + + // Update the property + update_single_property(&conn, &state, &PropertyType::BatterySoc, 50.0).await?; + + // Should receive the change signal + let signal = timeout(TIMEOUT, stream.next()).await?; + assert!(signal.is_some()); + let changed = signal.unwrap(); + assert!((changed.get().await? - 50.0).abs() < f64::EPSILON); + + Ok(()) +} diff --git a/service/tests/mqtt_to_dbus_e2e.rs b/service/tests/mqtt_to_dbus_e2e.rs new file mode 100644 index 0000000..3aebc40 --- /dev/null +++ b/service/tests/mqtt_to_dbus_e2e.rs @@ -0,0 +1,641 @@ +//! End-to-end tests: MQTT → Service → D-Bus pipeline. +//! +//! Each test spins up an embedded `rumqttd` broker and an isolated D-Bus daemon, +//! then exercises the real service code path from MQTT publish through to D-Bus +//! property reads. + +use std::collections::HashMap; +use std::net::{SocketAddr, TcpStream}; +use std::thread; +use std::time::Duration; + +use anyhow::Result; +use camper_widget_refresh::config::Config; +use camper_widget_refresh::dbus::start_service_at; +use camper_widget_refresh::victron_mqtt::{ + build_mqtt_options, read_values_and_update_properties_immediately, wait_for_serial, +}; +use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS}; +use rumqttd::{Broker, ConnectionSettings, RouterConfig, ServerSettings}; +use zbus::{Connection, connection, proxy}; + +// ---------- Client-side proxy definitions (same pattern as dbus_integration.rs) ---------- + +#[proxy( + interface = "org.craftknight.CamperWidget.Status", + default_service = "org.craftknight.CamperWidget", + default_path = "/org/craftknight/camper_widget" +)] +trait Status { + #[zbus(property)] + fn connected(&self) -> zbus::Result<bool>; +} + +#[proxy( + interface = "org.craftknight.CamperWidget.Battery", + default_service = "org.craftknight.CamperWidget", + default_path = "/org/craftknight/camper_widget/Battery" +)] +trait Battery { + #[zbus(property)] + fn soc(&self) -> zbus::Result<f64>; + + #[zbus(property)] + fn power(&self) -> zbus::Result<f64>; + + #[zbus(property)] + fn state(&self) -> zbus::Result<String>; +} + +#[proxy( + interface = "org.craftknight.CamperWidget.Solar", + default_service = "org.craftknight.CamperWidget", + default_path = "/org/craftknight/camper_widget/Solar" +)] +trait Solar { + #[zbus(property)] + fn power(&self) -> zbus::Result<f64>; +} + +#[proxy( + interface = "org.craftknight.CamperWidget.AcInput", + default_service = "org.craftknight.CamperWidget", + default_path = "/org/craftknight/camper_widget/AcInput" +)] +trait AcInput { + #[zbus(property)] + fn power(&self) -> zbus::Result<f64>; +} + +#[proxy( + interface = "org.craftknight.CamperWidget.AcLoad", + default_service = "org.craftknight.CamperWidget", + default_path = "/org/craftknight/camper_widget/AcLoad" +)] +trait AcLoad { + #[zbus(property)] + fn power(&self) -> zbus::Result<f64>; +} + +// ---------- Test fixture ---------- + +struct E2ETestFixture { + _dbus_daemon: dbus_launch::Daemon, + dbus_address: String, + mqtt_port: u16, + _broker_handle: thread::JoinHandle<()>, +} + +impl E2ETestFixture { + fn new() -> Self { + // 1. Launch isolated D-Bus daemon + let daemon = dbus_launch::Launcher::daemon() + .launch() + .expect("failed to launch dbus-daemon — is dbus-daemon installed?"); + let dbus_address = daemon.address().to_string(); + + // 2. Pick a random free port for the MQTT broker + let listener = + std::net::TcpListener::bind("127.0.0.1:0").expect("failed to bind ephemeral port"); + let mqtt_port = listener.local_addr().unwrap().port(); + drop(listener); + + // 3. Build rumqttd config programmatically + let mut server_settings = HashMap::new(); + server_settings.insert( + "test".to_string(), + ServerSettings { + name: "test".to_string(), + listen: SocketAddr::from(([127, 0, 0, 1], mqtt_port)), + tls: None, + next_connection_delay_ms: 0, + connections: ConnectionSettings { + connection_timeout_ms: 5000, + max_payload_size: 2048, + max_inflight_count: 100, + auth: None, + external_auth: None, + dynamic_filters: true, + }, + }, + ); + let broker_config = rumqttd::Config { + router: RouterConfig { + max_connections: 10, + max_outgoing_packet_count: 200, + max_segment_size: 10240, + max_segment_count: 10, + ..Default::default() + }, + v4: Some(server_settings), + ..Default::default() + }; + + // 4. Spawn broker in a dedicated OS thread (blocking call) + let broker_handle = thread::spawn(move || { + let mut broker = Broker::new(broker_config); + broker.start().expect("broker failed to start"); + }); + + // 5. Wait for broker to be ready + Self::wait_for_broker(mqtt_port, Duration::from_secs(2)); + + Self { + _dbus_daemon: daemon, + dbus_address, + mqtt_port, + _broker_handle: broker_handle, + } + } + + fn dbus_address(&self) -> &str { + &self.dbus_address + } + + fn mqtt_port(&self) -> u16 { + self.mqtt_port + } + + async fn dbus_client_connection(&self) -> Result<Connection> { + let conn = connection::Builder::address(self.dbus_address())? + .build() + .await?; + Ok(conn) + } + + fn create_publisher_client(&self, id: &str) -> (AsyncClient, EventLoop) { + let opts = MqttOptions::new(id, "127.0.0.1", self.mqtt_port()); + AsyncClient::new(opts, 10) + } + + fn test_config(&self) -> Config { + Config { + endpoints: vec![("127.0.0.1".to_string(), self.mqtt_port())], + username: None, + password: None, + client_id: "e2e-service-client".to_string(), + refresh_interval_seconds: 60, + } + } + + fn wait_for_broker(port: u16, timeout: Duration) { + let start = std::time::Instant::now(); + loop { + if TcpStream::connect(SocketAddr::from(([127, 0, 0, 1], port))).is_ok() { + return; + } + if start.elapsed() > timeout { + panic!( + "MQTT broker did not become ready within {}ms on port {}", + timeout.as_millis(), + port + ); + } + thread::sleep(Duration::from_millis(50)); + } + } +} + +// ---------- Helper: publish Victron topics ---------- + +const TEST_SERIAL: &str = "e2e_test_serial"; + +/// Victron battery state codes +const VICTRON_STATE_IDLE: i64 = 0; +const VICTRON_STATE_CHARGING: i64 = 1; +const VICTRON_STATE_DISCHARGING: i64 = 2; + +/// Build a Batteries JSON payload matching the Victron format. +fn batteries_json(soc: f64, power: f64, state: i64) -> String { + format!( + r#"{{"value":[{{"soc":{},"power":{},"state":{}}}]}}"#, + soc, power, state + ) +} + +async fn publish_all_topics( + client: &AsyncClient, + soc: f64, + power: f64, + battery_state: i64, + solar: f64, + ac_input: f64, + ac_load: f64, +) -> Result<()> { + client + .publish( + format!("N/{TEST_SERIAL}/system/0/Batteries"), + QoS::AtLeastOnce, + false, + batteries_json(soc, power, battery_state), + ) + .await?; + client + .publish( + format!("N/{TEST_SERIAL}/solarcharger/277/Yield/Power"), + QoS::AtLeastOnce, + false, + solar.to_string(), + ) + .await?; + client + .publish( + format!("N/{TEST_SERIAL}/system/0/Ac/Grid/L1/Power"), + QoS::AtLeastOnce, + false, + format!(r#"{{"value":{ac_input}}}"#), + ) + .await?; + client + .publish( + format!("N/{TEST_SERIAL}/system/0/Ac/Consumption/L1/Power"), + QoS::AtLeastOnce, + false, + format!(r#"{{"value":{ac_load}}}"#), + ) + .await?; + Ok(()) +} + +// ---------- Tests ---------- + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_mqtt_serial_discovery() -> Result<()> { + let fixture = E2ETestFixture::new(); + + // Create a publisher that will announce the serial + let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("serial-pub"); + tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); + + // Create service client for serial discovery + let cfg = fixture.test_config(); + let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); + let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); + + // Spawn wait_for_serial first (it subscribes then polls for the serial topic) + let svc_client_clone = svc_client.clone(); + let discovery = + tokio::spawn(async move { wait_for_serial(&svc_client_clone, &mut svc_eventloop).await }); + + // Give the service client time to connect and subscribe + tokio::time::sleep(Duration::from_millis(500)).await; + + // Now publish the serial announcement + pub_client + .publish( + format!("N/{TEST_SERIAL}/system/0/Serial"), + QoS::AtLeastOnce, + false, + TEST_SERIAL, + ) + .await?; + + let discovered = tokio::time::timeout(Duration::from_secs(5), discovery).await???; + assert_eq!(discovered, TEST_SERIAL); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_mqtt_all_properties_to_dbus() -> Result<()> { + let fixture = E2ETestFixture::new(); + + // Start D-Bus service + let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?; + + // Create publisher + let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("props-pub"); + tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); + + // Create service MQTT client + let cfg = fixture.test_config(); + let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); + let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); + + // Spawn the service reader that subscribes and updates D-Bus properties + let conn_clone = conn.clone(); + let dbus_state_clone = dbus_state.clone(); + let reader = tokio::spawn(async move { + read_values_and_update_properties_immediately( + &svc_client, + &mut svc_eventloop, + TEST_SERIAL, + &conn_clone, + &dbus_state_clone, + ) + .await + }); + + // Give the reader time to subscribe + tokio::time::sleep(Duration::from_millis(300)).await; + + // Publish Batteries JSON + solar + AC topics + publish_all_topics( + &pub_client, + 78.9, + 25.5, + VICTRON_STATE_CHARGING, + 310.0, + 82.0, + 77.0, + ) + .await?; + + // Wait for the reader to finish (all topics received) + let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???; + assert!(all_received, "expected all topics to be received"); + + // Small delay for signal propagation + tokio::time::sleep(Duration::from_millis(100)).await; + + // Read D-Bus properties via proxies + let client = fixture.dbus_client_connection().await?; + let battery = BatteryProxy::new(&client).await?; + let solar = SolarProxy::new(&client).await?; + + assert!((battery.soc().await? - 78.9).abs() < f64::EPSILON); + assert!((battery.power().await? - 25.5).abs() < f64::EPSILON); + assert!((solar.power().await? - 310.0).abs() < f64::EPSILON); + assert_eq!(battery.state().await?, "charging"); + + let ac_input = AcInputProxy::new(&client).await?; + let ac_load = AcLoadProxy::new(&client).await?; + assert!((ac_input.power().await? - 82.0).abs() < f64::EPSILON); + assert!((ac_load.power().await? - 77.0).abs() < f64::EPSILON); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_mqtt_battery_state_charging() -> Result<()> { + let fixture = E2ETestFixture::new(); + let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?; + + let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("charge-pub"); + tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); + + let cfg = fixture.test_config(); + let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); + let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); + + let conn_clone = conn.clone(); + let dbus_state_clone = dbus_state.clone(); + let reader = tokio::spawn(async move { + read_values_and_update_properties_immediately( + &svc_client, + &mut svc_eventloop, + TEST_SERIAL, + &conn_clone, + &dbus_state_clone, + ) + .await + }); + + tokio::time::sleep(Duration::from_millis(300)).await; + publish_all_topics( + &pub_client, + 60.0, + 15.0, + VICTRON_STATE_CHARGING, + 200.0, + 100.0, + 90.0, + ) + .await?; + + let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???; + assert!(all_received); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let client = fixture.dbus_client_connection().await?; + let battery = BatteryProxy::new(&client).await?; + assert_eq!(battery.state().await?, "charging"); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_mqtt_battery_state_discharging() -> Result<()> { + let fixture = E2ETestFixture::new(); + let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?; + + let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("discharge-pub"); + tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); + + let cfg = fixture.test_config(); + let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); + let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); + + let conn_clone = conn.clone(); + let dbus_state_clone = dbus_state.clone(); + let reader = tokio::spawn(async move { + read_values_and_update_properties_immediately( + &svc_client, + &mut svc_eventloop, + TEST_SERIAL, + &conn_clone, + &dbus_state_clone, + ) + .await + }); + + tokio::time::sleep(Duration::from_millis(300)).await; + publish_all_topics( + &pub_client, + 45.0, + -20.0, + VICTRON_STATE_DISCHARGING, + 0.0, + 0.0, + 50.0, + ) + .await?; + + let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???; + assert!(all_received); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let client = fixture.dbus_client_connection().await?; + let battery = BatteryProxy::new(&client).await?; + assert_eq!(battery.state().await?, "discharging"); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_mqtt_battery_state_idle() -> Result<()> { + let fixture = E2ETestFixture::new(); + let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?; + + let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("idle-pub"); + tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); + + let cfg = fixture.test_config(); + let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); + let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); + + let conn_clone = conn.clone(); + let dbus_state_clone = dbus_state.clone(); + let reader = tokio::spawn(async move { + read_values_and_update_properties_immediately( + &svc_client, + &mut svc_eventloop, + TEST_SERIAL, + &conn_clone, + &dbus_state_clone, + ) + .await + }); + + tokio::time::sleep(Duration::from_millis(300)).await; + publish_all_topics(&pub_client, 80.0, 2.0, VICTRON_STATE_IDLE, 50.0, 0.0, 30.0).await?; + + let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???; + assert!(all_received); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let client = fixture.dbus_client_connection().await?; + let battery = BatteryProxy::new(&client).await?; + assert_eq!(battery.state().await?, "idle"); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_mqtt_json_payload_format() -> Result<()> { + let fixture = E2ETestFixture::new(); + let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?; + + let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("json-pub"); + tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); + + let cfg = fixture.test_config(); + let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); + let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); + + let conn_clone = conn.clone(); + let dbus_state_clone = dbus_state.clone(); + let reader = tokio::spawn(async move { + read_values_and_update_properties_immediately( + &svc_client, + &mut svc_eventloop, + TEST_SERIAL, + &conn_clone, + &dbus_state_clone, + ) + .await + }); + + tokio::time::sleep(Duration::from_millis(300)).await; + + // Publish Batteries with JSON value format + solar with {"value": <number>} + AC topics + pub_client + .publish( + format!("N/{TEST_SERIAL}/system/0/Batteries"), + QoS::AtLeastOnce, + false, + batteries_json(225.5, 30.0, VICTRON_STATE_CHARGING), + ) + .await?; + pub_client + .publish( + format!("N/{TEST_SERIAL}/solarcharger/277/Yield/Power"), + QoS::AtLeastOnce, + false, + r#"{"value": 450.0}"#, + ) + .await?; + pub_client + .publish( + format!("N/{TEST_SERIAL}/system/0/Ac/Grid/L1/Power"), + QoS::AtLeastOnce, + false, + r#"{"value": 120.0}"#, + ) + .await?; + pub_client + .publish( + format!("N/{TEST_SERIAL}/system/0/Ac/Consumption/L1/Power"), + QoS::AtLeastOnce, + false, + r#"{"value": 95.0}"#, + ) + .await?; + + let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???; + assert!(all_received, "expected all topics to be received"); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let client = fixture.dbus_client_connection().await?; + let battery = BatteryProxy::new(&client).await?; + let solar = SolarProxy::new(&client).await?; + let ac_input = AcInputProxy::new(&client).await?; + let ac_load = AcLoadProxy::new(&client).await?; + + assert!((battery.soc().await? - 225.5).abs() < f64::EPSILON); + assert!((battery.power().await? - 30.0).abs() < f64::EPSILON); + assert!((solar.power().await? - 450.0).abs() < f64::EPSILON); + assert!((ac_input.power().await? - 120.0).abs() < f64::EPSILON); + assert!((ac_load.power().await? - 95.0).abs() < f64::EPSILON); + + Ok(()) +} + +#[ignore] // Takes ~30s due to internal timeout; run with `cargo test -- --ignored` +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn test_mqtt_timeout_with_partial_data() -> Result<()> { + let fixture = E2ETestFixture::new(); + let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?; + + let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("partial-pub"); + tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); + + let cfg = fixture.test_config(); + let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); + let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); + + let conn_clone = conn.clone(); + let dbus_state_clone = dbus_state.clone(); + let reader = tokio::spawn(async move { + read_values_and_update_properties_immediately( + &svc_client, + &mut svc_eventloop, + TEST_SERIAL, + &conn_clone, + &dbus_state_clone, + ) + .await + }); + + tokio::time::sleep(Duration::from_millis(300)).await; + + // Only publish Batteries (omit solar + AC) — 1 of 4 topics + pub_client + .publish( + format!("N/{TEST_SERIAL}/system/0/Batteries"), + QoS::AtLeastOnce, + false, + batteries_json(55.0, 10.0, VICTRON_STATE_CHARGING), + ) + .await?; + + // Should return false after the 30s internal timeout + let all_received = tokio::time::timeout(Duration::from_secs(35), reader).await???; + assert!(!all_received, "expected timeout with partial data"); + + // The received battery properties should be set on D-Bus + let client = fixture.dbus_client_connection().await?; + let battery = BatteryProxy::new(&client).await?; + + assert!((battery.soc().await? - 55.0).abs() < f64::EPSILON); + assert!((battery.power().await? - 10.0).abs() < f64::EPSILON); + // Solar should still be at default (0.0) + let solar = SolarProxy::new(&client).await?; + assert!((solar.power().await? - 0.0).abs() < f64::EPSILON); + + Ok(()) +} |
