//! End-to-end tests: MQTT → Service → D-Bus pipeline. //! //! Each test spins up an embedded `rumqttd` broker and an isolated D-Bus daemon, //! then exercises the real service code path from MQTT publish through to D-Bus //! property reads. use std::collections::HashMap; use std::net::{SocketAddr, TcpStream}; use std::thread; use std::time::Duration; use anyhow::Result; use camper_widget_refresh::config::Config; use camper_widget_refresh::dbus::start_service_at; use camper_widget_refresh::victron_mqtt::{ build_mqtt_options, read_values_and_update_properties_immediately, wait_for_serial, }; use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS}; use rumqttd::{Broker, ConnectionSettings, RouterConfig, ServerSettings}; use zbus::{Connection, connection, proxy}; // ---------- Client-side proxy definitions (same pattern as dbus_integration.rs) ---------- #[proxy( interface = "org.craftknight.CamperWidget.Status", default_service = "org.craftknight.CamperWidget", default_path = "/org/craftknight/camper_widget" )] trait Status { #[zbus(property)] fn connected(&self) -> zbus::Result; } #[proxy( interface = "org.craftknight.CamperWidget.Battery", default_service = "org.craftknight.CamperWidget", default_path = "/org/craftknight/camper_widget/Battery" )] trait Battery { #[zbus(property)] fn soc(&self) -> zbus::Result; #[zbus(property)] fn power(&self) -> zbus::Result; #[zbus(property)] fn state(&self) -> zbus::Result; } #[proxy( interface = "org.craftknight.CamperWidget.Solar", default_service = "org.craftknight.CamperWidget", default_path = "/org/craftknight/camper_widget/Solar" )] trait Solar { #[zbus(property)] fn power(&self) -> zbus::Result; } #[proxy( interface = "org.craftknight.CamperWidget.AcInput", default_service = "org.craftknight.CamperWidget", default_path = "/org/craftknight/camper_widget/AcInput" )] trait AcInput { #[zbus(property)] fn power(&self) -> zbus::Result; } #[proxy( interface = "org.craftknight.CamperWidget.AcLoad", default_service = "org.craftknight.CamperWidget", default_path = "/org/craftknight/camper_widget/AcLoad" )] trait AcLoad { #[zbus(property)] fn power(&self) -> zbus::Result; } // ---------- Test fixture ---------- struct E2ETestFixture { _dbus_daemon: dbus_launch::Daemon, dbus_address: String, mqtt_port: u16, _broker_handle: thread::JoinHandle<()>, } impl E2ETestFixture { fn new() -> Self { // 1. Launch isolated D-Bus daemon let daemon = dbus_launch::Launcher::daemon() .launch() .expect("failed to launch dbus-daemon — is dbus-daemon installed?"); let dbus_address = daemon.address().to_string(); // 2. Pick a random free port for the MQTT broker let listener = std::net::TcpListener::bind("127.0.0.1:0").expect("failed to bind ephemeral port"); let mqtt_port = listener.local_addr().unwrap().port(); drop(listener); // 3. Build rumqttd config programmatically let mut server_settings = HashMap::new(); server_settings.insert( "test".to_string(), ServerSettings { name: "test".to_string(), listen: SocketAddr::from(([127, 0, 0, 1], mqtt_port)), tls: None, next_connection_delay_ms: 0, connections: ConnectionSettings { connection_timeout_ms: 5000, max_payload_size: 2048, max_inflight_count: 100, auth: None, external_auth: None, dynamic_filters: true, }, }, ); let broker_config = rumqttd::Config { router: RouterConfig { max_connections: 10, max_outgoing_packet_count: 200, max_segment_size: 10240, max_segment_count: 10, ..Default::default() }, v4: Some(server_settings), ..Default::default() }; // 4. Spawn broker in a dedicated OS thread (blocking call) let broker_handle = thread::spawn(move || { let mut broker = Broker::new(broker_config); broker.start().expect("broker failed to start"); }); // 5. Wait for broker to be ready Self::wait_for_broker(mqtt_port, Duration::from_secs(2)); Self { _dbus_daemon: daemon, dbus_address, mqtt_port, _broker_handle: broker_handle, } } fn dbus_address(&self) -> &str { &self.dbus_address } fn mqtt_port(&self) -> u16 { self.mqtt_port } async fn dbus_client_connection(&self) -> Result { let conn = connection::Builder::address(self.dbus_address())? .build() .await?; Ok(conn) } fn create_publisher_client(&self, id: &str) -> (AsyncClient, EventLoop) { let opts = MqttOptions::new(id, "127.0.0.1", self.mqtt_port()); AsyncClient::new(opts, 10) } fn test_config(&self) -> Config { Config { endpoints: vec![("127.0.0.1".to_string(), self.mqtt_port())], username: None, password: None, client_id: "e2e-service-client".to_string(), refresh_interval_seconds: 60, } } fn wait_for_broker(port: u16, timeout: Duration) { let start = std::time::Instant::now(); loop { if TcpStream::connect(SocketAddr::from(([127, 0, 0, 1], port))).is_ok() { return; } if start.elapsed() > timeout { panic!( "MQTT broker did not become ready within {}ms on port {}", timeout.as_millis(), port ); } thread::sleep(Duration::from_millis(50)); } } } // ---------- Helper: publish Victron topics ---------- const TEST_SERIAL: &str = "e2e_test_serial"; /// Victron battery state codes const VICTRON_STATE_IDLE: i64 = 0; const VICTRON_STATE_CHARGING: i64 = 1; const VICTRON_STATE_DISCHARGING: i64 = 2; /// Build a Batteries JSON payload matching the Victron format. fn batteries_json(soc: f64, power: f64, state: i64) -> String { format!( r#"{{"value":[{{"soc":{},"power":{},"state":{}}}]}}"#, soc, power, state ) } async fn publish_all_topics( client: &AsyncClient, soc: f64, power: f64, battery_state: i64, solar: f64, ac_input: f64, ac_load: f64, ) -> Result<()> { client .publish( format!("N/{TEST_SERIAL}/system/0/Batteries"), QoS::AtLeastOnce, false, batteries_json(soc, power, battery_state), ) .await?; client .publish( format!("N/{TEST_SERIAL}/solarcharger/277/Yield/Power"), QoS::AtLeastOnce, false, solar.to_string(), ) .await?; client .publish( format!("N/{TEST_SERIAL}/system/0/Ac/Grid/L1/Power"), QoS::AtLeastOnce, false, format!(r#"{{"value":{ac_input}}}"#), ) .await?; client .publish( format!("N/{TEST_SERIAL}/system/0/Ac/Consumption/L1/Power"), QoS::AtLeastOnce, false, format!(r#"{{"value":{ac_load}}}"#), ) .await?; Ok(()) } // ---------- Tests ---------- #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_mqtt_serial_discovery() -> Result<()> { let fixture = E2ETestFixture::new(); // Create a publisher that will announce the serial let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("serial-pub"); tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); // Create service client for serial discovery let cfg = fixture.test_config(); let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); // Spawn wait_for_serial first (it subscribes then polls for the serial topic) let svc_client_clone = svc_client.clone(); let discovery = tokio::spawn(async move { wait_for_serial(&svc_client_clone, &mut svc_eventloop).await }); // Give the service client time to connect and subscribe tokio::time::sleep(Duration::from_millis(500)).await; // Now publish the serial announcement pub_client .publish( format!("N/{TEST_SERIAL}/system/0/Serial"), QoS::AtLeastOnce, false, TEST_SERIAL, ) .await?; let discovered = tokio::time::timeout(Duration::from_secs(5), discovery).await???; assert_eq!(discovered, TEST_SERIAL); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_mqtt_all_properties_to_dbus() -> Result<()> { let fixture = E2ETestFixture::new(); // Start D-Bus service let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?; // Create publisher let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("props-pub"); tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); // Create service MQTT client let cfg = fixture.test_config(); let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); // Spawn the service reader that subscribes and updates D-Bus properties let conn_clone = conn.clone(); let dbus_state_clone = dbus_state.clone(); let reader = tokio::spawn(async move { read_values_and_update_properties_immediately( &svc_client, &mut svc_eventloop, TEST_SERIAL, &conn_clone, &dbus_state_clone, ) .await }); // Give the reader time to subscribe tokio::time::sleep(Duration::from_millis(300)).await; // Publish Batteries JSON + solar + AC topics publish_all_topics( &pub_client, 78.9, 25.5, VICTRON_STATE_CHARGING, 310.0, 82.0, 77.0, ) .await?; // Wait for the reader to finish (all topics received) let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???; assert!(all_received, "expected all topics to be received"); // Small delay for signal propagation tokio::time::sleep(Duration::from_millis(100)).await; // Read D-Bus properties via proxies let client = fixture.dbus_client_connection().await?; let battery = BatteryProxy::new(&client).await?; let solar = SolarProxy::new(&client).await?; assert!((battery.soc().await? - 78.9).abs() < f64::EPSILON); assert!((battery.power().await? - 25.5).abs() < f64::EPSILON); assert!((solar.power().await? - 310.0).abs() < f64::EPSILON); assert_eq!(battery.state().await?, "charging"); let ac_input = AcInputProxy::new(&client).await?; let ac_load = AcLoadProxy::new(&client).await?; assert!((ac_input.power().await? - 82.0).abs() < f64::EPSILON); assert!((ac_load.power().await? - 77.0).abs() < f64::EPSILON); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_mqtt_battery_state_charging() -> Result<()> { let fixture = E2ETestFixture::new(); let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?; let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("charge-pub"); tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); let cfg = fixture.test_config(); let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); let conn_clone = conn.clone(); let dbus_state_clone = dbus_state.clone(); let reader = tokio::spawn(async move { read_values_and_update_properties_immediately( &svc_client, &mut svc_eventloop, TEST_SERIAL, &conn_clone, &dbus_state_clone, ) .await }); tokio::time::sleep(Duration::from_millis(300)).await; publish_all_topics( &pub_client, 60.0, 15.0, VICTRON_STATE_CHARGING, 200.0, 100.0, 90.0, ) .await?; let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???; assert!(all_received); tokio::time::sleep(Duration::from_millis(100)).await; let client = fixture.dbus_client_connection().await?; let battery = BatteryProxy::new(&client).await?; assert_eq!(battery.state().await?, "charging"); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_mqtt_battery_state_discharging() -> Result<()> { let fixture = E2ETestFixture::new(); let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?; let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("discharge-pub"); tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); let cfg = fixture.test_config(); let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); let conn_clone = conn.clone(); let dbus_state_clone = dbus_state.clone(); let reader = tokio::spawn(async move { read_values_and_update_properties_immediately( &svc_client, &mut svc_eventloop, TEST_SERIAL, &conn_clone, &dbus_state_clone, ) .await }); tokio::time::sleep(Duration::from_millis(300)).await; publish_all_topics( &pub_client, 45.0, -20.0, VICTRON_STATE_DISCHARGING, 0.0, 0.0, 50.0, ) .await?; let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???; assert!(all_received); tokio::time::sleep(Duration::from_millis(100)).await; let client = fixture.dbus_client_connection().await?; let battery = BatteryProxy::new(&client).await?; assert_eq!(battery.state().await?, "discharging"); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_mqtt_battery_state_idle() -> Result<()> { let fixture = E2ETestFixture::new(); let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?; let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("idle-pub"); tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); let cfg = fixture.test_config(); let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); let conn_clone = conn.clone(); let dbus_state_clone = dbus_state.clone(); let reader = tokio::spawn(async move { read_values_and_update_properties_immediately( &svc_client, &mut svc_eventloop, TEST_SERIAL, &conn_clone, &dbus_state_clone, ) .await }); tokio::time::sleep(Duration::from_millis(300)).await; publish_all_topics(&pub_client, 80.0, 2.0, VICTRON_STATE_IDLE, 50.0, 0.0, 30.0).await?; let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???; assert!(all_received); tokio::time::sleep(Duration::from_millis(100)).await; let client = fixture.dbus_client_connection().await?; let battery = BatteryProxy::new(&client).await?; assert_eq!(battery.state().await?, "idle"); Ok(()) } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_mqtt_json_payload_format() -> Result<()> { let fixture = E2ETestFixture::new(); let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?; let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("json-pub"); tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); let cfg = fixture.test_config(); let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); let conn_clone = conn.clone(); let dbus_state_clone = dbus_state.clone(); let reader = tokio::spawn(async move { read_values_and_update_properties_immediately( &svc_client, &mut svc_eventloop, TEST_SERIAL, &conn_clone, &dbus_state_clone, ) .await }); tokio::time::sleep(Duration::from_millis(300)).await; // Publish Batteries with JSON value format + solar with {"value": } + AC topics pub_client .publish( format!("N/{TEST_SERIAL}/system/0/Batteries"), QoS::AtLeastOnce, false, batteries_json(225.5, 30.0, VICTRON_STATE_CHARGING), ) .await?; pub_client .publish( format!("N/{TEST_SERIAL}/solarcharger/277/Yield/Power"), QoS::AtLeastOnce, false, r#"{"value": 450.0}"#, ) .await?; pub_client .publish( format!("N/{TEST_SERIAL}/system/0/Ac/Grid/L1/Power"), QoS::AtLeastOnce, false, r#"{"value": 120.0}"#, ) .await?; pub_client .publish( format!("N/{TEST_SERIAL}/system/0/Ac/Consumption/L1/Power"), QoS::AtLeastOnce, false, r#"{"value": 95.0}"#, ) .await?; let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???; assert!(all_received, "expected all topics to be received"); tokio::time::sleep(Duration::from_millis(100)).await; let client = fixture.dbus_client_connection().await?; let battery = BatteryProxy::new(&client).await?; let solar = SolarProxy::new(&client).await?; let ac_input = AcInputProxy::new(&client).await?; let ac_load = AcLoadProxy::new(&client).await?; assert!((battery.soc().await? - 225.5).abs() < f64::EPSILON); assert!((battery.power().await? - 30.0).abs() < f64::EPSILON); assert!((solar.power().await? - 450.0).abs() < f64::EPSILON); assert!((ac_input.power().await? - 120.0).abs() < f64::EPSILON); assert!((ac_load.power().await? - 95.0).abs() < f64::EPSILON); Ok(()) } #[ignore] // Takes ~30s due to internal timeout; run with `cargo test -- --ignored` #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn test_mqtt_timeout_with_partial_data() -> Result<()> { let fixture = E2ETestFixture::new(); let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?; let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("partial-pub"); tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} }); let cfg = fixture.test_config(); let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port()); let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10); let conn_clone = conn.clone(); let dbus_state_clone = dbus_state.clone(); let reader = tokio::spawn(async move { read_values_and_update_properties_immediately( &svc_client, &mut svc_eventloop, TEST_SERIAL, &conn_clone, &dbus_state_clone, ) .await }); tokio::time::sleep(Duration::from_millis(300)).await; // Only publish Batteries (omit solar + AC) — 1 of 4 topics pub_client .publish( format!("N/{TEST_SERIAL}/system/0/Batteries"), QoS::AtLeastOnce, false, batteries_json(55.0, 10.0, VICTRON_STATE_CHARGING), ) .await?; // Should return false after the 30s internal timeout let all_received = tokio::time::timeout(Duration::from_secs(35), reader).await???; assert!(!all_received, "expected timeout with partial data"); // The received battery properties should be set on D-Bus let client = fixture.dbus_client_connection().await?; let battery = BatteryProxy::new(&client).await?; assert!((battery.soc().await? - 55.0).abs() < f64::EPSILON); assert!((battery.power().await? - 10.0).abs() < f64::EPSILON); // Solar should still be at default (0.0) let solar = SolarProxy::new(&client).await?; assert!((solar.power().await? - 0.0).abs() < f64::EPSILON); Ok(()) }