summaryrefslogtreecommitdiff
path: root/service/tests/mqtt_to_dbus_e2e.rs
diff options
context:
space:
mode:
Diffstat (limited to 'service/tests/mqtt_to_dbus_e2e.rs')
-rw-r--r--service/tests/mqtt_to_dbus_e2e.rs641
1 files changed, 641 insertions, 0 deletions
diff --git a/service/tests/mqtt_to_dbus_e2e.rs b/service/tests/mqtt_to_dbus_e2e.rs
new file mode 100644
index 0000000..3aebc40
--- /dev/null
+++ b/service/tests/mqtt_to_dbus_e2e.rs
@@ -0,0 +1,641 @@
+//! End-to-end tests: MQTT → Service → D-Bus pipeline.
+//!
+//! Each test spins up an embedded `rumqttd` broker and an isolated D-Bus daemon,
+//! then exercises the real service code path from MQTT publish through to D-Bus
+//! property reads.
+
+use std::collections::HashMap;
+use std::net::{SocketAddr, TcpStream};
+use std::thread;
+use std::time::Duration;
+
+use anyhow::Result;
+use camper_widget_refresh::config::Config;
+use camper_widget_refresh::dbus::start_service_at;
+use camper_widget_refresh::victron_mqtt::{
+ build_mqtt_options, read_values_and_update_properties_immediately, wait_for_serial,
+};
+use rumqttc::{AsyncClient, EventLoop, MqttOptions, QoS};
+use rumqttd::{Broker, ConnectionSettings, RouterConfig, ServerSettings};
+use zbus::{Connection, connection, proxy};
+
+// ---------- Client-side proxy definitions (same pattern as dbus_integration.rs) ----------
+
+#[proxy(
+ interface = "org.craftknight.CamperWidget.Status",
+ default_service = "org.craftknight.CamperWidget",
+ default_path = "/org/craftknight/camper_widget"
+)]
+trait Status {
+ #[zbus(property)]
+ fn connected(&self) -> zbus::Result<bool>;
+}
+
+#[proxy(
+ interface = "org.craftknight.CamperWidget.Battery",
+ default_service = "org.craftknight.CamperWidget",
+ default_path = "/org/craftknight/camper_widget/Battery"
+)]
+trait Battery {
+ #[zbus(property)]
+ fn soc(&self) -> zbus::Result<f64>;
+
+ #[zbus(property)]
+ fn power(&self) -> zbus::Result<f64>;
+
+ #[zbus(property)]
+ fn state(&self) -> zbus::Result<String>;
+}
+
+#[proxy(
+ interface = "org.craftknight.CamperWidget.Solar",
+ default_service = "org.craftknight.CamperWidget",
+ default_path = "/org/craftknight/camper_widget/Solar"
+)]
+trait Solar {
+ #[zbus(property)]
+ fn power(&self) -> zbus::Result<f64>;
+}
+
+#[proxy(
+ interface = "org.craftknight.CamperWidget.AcInput",
+ default_service = "org.craftknight.CamperWidget",
+ default_path = "/org/craftknight/camper_widget/AcInput"
+)]
+trait AcInput {
+ #[zbus(property)]
+ fn power(&self) -> zbus::Result<f64>;
+}
+
+#[proxy(
+ interface = "org.craftknight.CamperWidget.AcLoad",
+ default_service = "org.craftknight.CamperWidget",
+ default_path = "/org/craftknight/camper_widget/AcLoad"
+)]
+trait AcLoad {
+ #[zbus(property)]
+ fn power(&self) -> zbus::Result<f64>;
+}
+
+// ---------- Test fixture ----------
+
+struct E2ETestFixture {
+ _dbus_daemon: dbus_launch::Daemon,
+ dbus_address: String,
+ mqtt_port: u16,
+ _broker_handle: thread::JoinHandle<()>,
+}
+
+impl E2ETestFixture {
+ fn new() -> Self {
+ // 1. Launch isolated D-Bus daemon
+ let daemon = dbus_launch::Launcher::daemon()
+ .launch()
+ .expect("failed to launch dbus-daemon — is dbus-daemon installed?");
+ let dbus_address = daemon.address().to_string();
+
+ // 2. Pick a random free port for the MQTT broker
+ let listener =
+ std::net::TcpListener::bind("127.0.0.1:0").expect("failed to bind ephemeral port");
+ let mqtt_port = listener.local_addr().unwrap().port();
+ drop(listener);
+
+ // 3. Build rumqttd config programmatically
+ let mut server_settings = HashMap::new();
+ server_settings.insert(
+ "test".to_string(),
+ ServerSettings {
+ name: "test".to_string(),
+ listen: SocketAddr::from(([127, 0, 0, 1], mqtt_port)),
+ tls: None,
+ next_connection_delay_ms: 0,
+ connections: ConnectionSettings {
+ connection_timeout_ms: 5000,
+ max_payload_size: 2048,
+ max_inflight_count: 100,
+ auth: None,
+ external_auth: None,
+ dynamic_filters: true,
+ },
+ },
+ );
+ let broker_config = rumqttd::Config {
+ router: RouterConfig {
+ max_connections: 10,
+ max_outgoing_packet_count: 200,
+ max_segment_size: 10240,
+ max_segment_count: 10,
+ ..Default::default()
+ },
+ v4: Some(server_settings),
+ ..Default::default()
+ };
+
+ // 4. Spawn broker in a dedicated OS thread (blocking call)
+ let broker_handle = thread::spawn(move || {
+ let mut broker = Broker::new(broker_config);
+ broker.start().expect("broker failed to start");
+ });
+
+ // 5. Wait for broker to be ready
+ Self::wait_for_broker(mqtt_port, Duration::from_secs(2));
+
+ Self {
+ _dbus_daemon: daemon,
+ dbus_address,
+ mqtt_port,
+ _broker_handle: broker_handle,
+ }
+ }
+
+ fn dbus_address(&self) -> &str {
+ &self.dbus_address
+ }
+
+ fn mqtt_port(&self) -> u16 {
+ self.mqtt_port
+ }
+
+ async fn dbus_client_connection(&self) -> Result<Connection> {
+ let conn = connection::Builder::address(self.dbus_address())?
+ .build()
+ .await?;
+ Ok(conn)
+ }
+
+ fn create_publisher_client(&self, id: &str) -> (AsyncClient, EventLoop) {
+ let opts = MqttOptions::new(id, "127.0.0.1", self.mqtt_port());
+ AsyncClient::new(opts, 10)
+ }
+
+ fn test_config(&self) -> Config {
+ Config {
+ endpoints: vec![("127.0.0.1".to_string(), self.mqtt_port())],
+ username: None,
+ password: None,
+ client_id: "e2e-service-client".to_string(),
+ refresh_interval_seconds: 60,
+ }
+ }
+
+ fn wait_for_broker(port: u16, timeout: Duration) {
+ let start = std::time::Instant::now();
+ loop {
+ if TcpStream::connect(SocketAddr::from(([127, 0, 0, 1], port))).is_ok() {
+ return;
+ }
+ if start.elapsed() > timeout {
+ panic!(
+ "MQTT broker did not become ready within {}ms on port {}",
+ timeout.as_millis(),
+ port
+ );
+ }
+ thread::sleep(Duration::from_millis(50));
+ }
+ }
+}
+
+// ---------- Helper: publish Victron topics ----------
+
+const TEST_SERIAL: &str = "e2e_test_serial";
+
+/// Victron battery state codes
+const VICTRON_STATE_IDLE: i64 = 0;
+const VICTRON_STATE_CHARGING: i64 = 1;
+const VICTRON_STATE_DISCHARGING: i64 = 2;
+
+/// Build a Batteries JSON payload matching the Victron format.
+fn batteries_json(soc: f64, power: f64, state: i64) -> String {
+ format!(
+ r#"{{"value":[{{"soc":{},"power":{},"state":{}}}]}}"#,
+ soc, power, state
+ )
+}
+
+async fn publish_all_topics(
+ client: &AsyncClient,
+ soc: f64,
+ power: f64,
+ battery_state: i64,
+ solar: f64,
+ ac_input: f64,
+ ac_load: f64,
+) -> Result<()> {
+ client
+ .publish(
+ format!("N/{TEST_SERIAL}/system/0/Batteries"),
+ QoS::AtLeastOnce,
+ false,
+ batteries_json(soc, power, battery_state),
+ )
+ .await?;
+ client
+ .publish(
+ format!("N/{TEST_SERIAL}/solarcharger/277/Yield/Power"),
+ QoS::AtLeastOnce,
+ false,
+ solar.to_string(),
+ )
+ .await?;
+ client
+ .publish(
+ format!("N/{TEST_SERIAL}/system/0/Ac/Grid/L1/Power"),
+ QoS::AtLeastOnce,
+ false,
+ format!(r#"{{"value":{ac_input}}}"#),
+ )
+ .await?;
+ client
+ .publish(
+ format!("N/{TEST_SERIAL}/system/0/Ac/Consumption/L1/Power"),
+ QoS::AtLeastOnce,
+ false,
+ format!(r#"{{"value":{ac_load}}}"#),
+ )
+ .await?;
+ Ok(())
+}
+
+// ---------- Tests ----------
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_mqtt_serial_discovery() -> Result<()> {
+ let fixture = E2ETestFixture::new();
+
+ // Create a publisher that will announce the serial
+ let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("serial-pub");
+ tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} });
+
+ // Create service client for serial discovery
+ let cfg = fixture.test_config();
+ let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port());
+ let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10);
+
+ // Spawn wait_for_serial first (it subscribes then polls for the serial topic)
+ let svc_client_clone = svc_client.clone();
+ let discovery =
+ tokio::spawn(async move { wait_for_serial(&svc_client_clone, &mut svc_eventloop).await });
+
+ // Give the service client time to connect and subscribe
+ tokio::time::sleep(Duration::from_millis(500)).await;
+
+ // Now publish the serial announcement
+ pub_client
+ .publish(
+ format!("N/{TEST_SERIAL}/system/0/Serial"),
+ QoS::AtLeastOnce,
+ false,
+ TEST_SERIAL,
+ )
+ .await?;
+
+ let discovered = tokio::time::timeout(Duration::from_secs(5), discovery).await???;
+ assert_eq!(discovered, TEST_SERIAL);
+
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_mqtt_all_properties_to_dbus() -> Result<()> {
+ let fixture = E2ETestFixture::new();
+
+ // Start D-Bus service
+ let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?;
+
+ // Create publisher
+ let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("props-pub");
+ tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} });
+
+ // Create service MQTT client
+ let cfg = fixture.test_config();
+ let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port());
+ let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10);
+
+ // Spawn the service reader that subscribes and updates D-Bus properties
+ let conn_clone = conn.clone();
+ let dbus_state_clone = dbus_state.clone();
+ let reader = tokio::spawn(async move {
+ read_values_and_update_properties_immediately(
+ &svc_client,
+ &mut svc_eventloop,
+ TEST_SERIAL,
+ &conn_clone,
+ &dbus_state_clone,
+ )
+ .await
+ });
+
+ // Give the reader time to subscribe
+ tokio::time::sleep(Duration::from_millis(300)).await;
+
+ // Publish Batteries JSON + solar + AC topics
+ publish_all_topics(
+ &pub_client,
+ 78.9,
+ 25.5,
+ VICTRON_STATE_CHARGING,
+ 310.0,
+ 82.0,
+ 77.0,
+ )
+ .await?;
+
+ // Wait for the reader to finish (all topics received)
+ let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???;
+ assert!(all_received, "expected all topics to be received");
+
+ // Small delay for signal propagation
+ tokio::time::sleep(Duration::from_millis(100)).await;
+
+ // Read D-Bus properties via proxies
+ let client = fixture.dbus_client_connection().await?;
+ let battery = BatteryProxy::new(&client).await?;
+ let solar = SolarProxy::new(&client).await?;
+
+ assert!((battery.soc().await? - 78.9).abs() < f64::EPSILON);
+ assert!((battery.power().await? - 25.5).abs() < f64::EPSILON);
+ assert!((solar.power().await? - 310.0).abs() < f64::EPSILON);
+ assert_eq!(battery.state().await?, "charging");
+
+ let ac_input = AcInputProxy::new(&client).await?;
+ let ac_load = AcLoadProxy::new(&client).await?;
+ assert!((ac_input.power().await? - 82.0).abs() < f64::EPSILON);
+ assert!((ac_load.power().await? - 77.0).abs() < f64::EPSILON);
+
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_mqtt_battery_state_charging() -> Result<()> {
+ let fixture = E2ETestFixture::new();
+ let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?;
+
+ let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("charge-pub");
+ tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} });
+
+ let cfg = fixture.test_config();
+ let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port());
+ let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10);
+
+ let conn_clone = conn.clone();
+ let dbus_state_clone = dbus_state.clone();
+ let reader = tokio::spawn(async move {
+ read_values_and_update_properties_immediately(
+ &svc_client,
+ &mut svc_eventloop,
+ TEST_SERIAL,
+ &conn_clone,
+ &dbus_state_clone,
+ )
+ .await
+ });
+
+ tokio::time::sleep(Duration::from_millis(300)).await;
+ publish_all_topics(
+ &pub_client,
+ 60.0,
+ 15.0,
+ VICTRON_STATE_CHARGING,
+ 200.0,
+ 100.0,
+ 90.0,
+ )
+ .await?;
+
+ let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???;
+ assert!(all_received);
+
+ tokio::time::sleep(Duration::from_millis(100)).await;
+
+ let client = fixture.dbus_client_connection().await?;
+ let battery = BatteryProxy::new(&client).await?;
+ assert_eq!(battery.state().await?, "charging");
+
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_mqtt_battery_state_discharging() -> Result<()> {
+ let fixture = E2ETestFixture::new();
+ let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?;
+
+ let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("discharge-pub");
+ tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} });
+
+ let cfg = fixture.test_config();
+ let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port());
+ let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10);
+
+ let conn_clone = conn.clone();
+ let dbus_state_clone = dbus_state.clone();
+ let reader = tokio::spawn(async move {
+ read_values_and_update_properties_immediately(
+ &svc_client,
+ &mut svc_eventloop,
+ TEST_SERIAL,
+ &conn_clone,
+ &dbus_state_clone,
+ )
+ .await
+ });
+
+ tokio::time::sleep(Duration::from_millis(300)).await;
+ publish_all_topics(
+ &pub_client,
+ 45.0,
+ -20.0,
+ VICTRON_STATE_DISCHARGING,
+ 0.0,
+ 0.0,
+ 50.0,
+ )
+ .await?;
+
+ let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???;
+ assert!(all_received);
+
+ tokio::time::sleep(Duration::from_millis(100)).await;
+
+ let client = fixture.dbus_client_connection().await?;
+ let battery = BatteryProxy::new(&client).await?;
+ assert_eq!(battery.state().await?, "discharging");
+
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_mqtt_battery_state_idle() -> Result<()> {
+ let fixture = E2ETestFixture::new();
+ let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?;
+
+ let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("idle-pub");
+ tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} });
+
+ let cfg = fixture.test_config();
+ let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port());
+ let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10);
+
+ let conn_clone = conn.clone();
+ let dbus_state_clone = dbus_state.clone();
+ let reader = tokio::spawn(async move {
+ read_values_and_update_properties_immediately(
+ &svc_client,
+ &mut svc_eventloop,
+ TEST_SERIAL,
+ &conn_clone,
+ &dbus_state_clone,
+ )
+ .await
+ });
+
+ tokio::time::sleep(Duration::from_millis(300)).await;
+ publish_all_topics(&pub_client, 80.0, 2.0, VICTRON_STATE_IDLE, 50.0, 0.0, 30.0).await?;
+
+ let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???;
+ assert!(all_received);
+
+ tokio::time::sleep(Duration::from_millis(100)).await;
+
+ let client = fixture.dbus_client_connection().await?;
+ let battery = BatteryProxy::new(&client).await?;
+ assert_eq!(battery.state().await?, "idle");
+
+ Ok(())
+}
+
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_mqtt_json_payload_format() -> Result<()> {
+ let fixture = E2ETestFixture::new();
+ let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?;
+
+ let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("json-pub");
+ tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} });
+
+ let cfg = fixture.test_config();
+ let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port());
+ let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10);
+
+ let conn_clone = conn.clone();
+ let dbus_state_clone = dbus_state.clone();
+ let reader = tokio::spawn(async move {
+ read_values_and_update_properties_immediately(
+ &svc_client,
+ &mut svc_eventloop,
+ TEST_SERIAL,
+ &conn_clone,
+ &dbus_state_clone,
+ )
+ .await
+ });
+
+ tokio::time::sleep(Duration::from_millis(300)).await;
+
+ // Publish Batteries with JSON value format + solar with {"value": <number>} + AC topics
+ pub_client
+ .publish(
+ format!("N/{TEST_SERIAL}/system/0/Batteries"),
+ QoS::AtLeastOnce,
+ false,
+ batteries_json(225.5, 30.0, VICTRON_STATE_CHARGING),
+ )
+ .await?;
+ pub_client
+ .publish(
+ format!("N/{TEST_SERIAL}/solarcharger/277/Yield/Power"),
+ QoS::AtLeastOnce,
+ false,
+ r#"{"value": 450.0}"#,
+ )
+ .await?;
+ pub_client
+ .publish(
+ format!("N/{TEST_SERIAL}/system/0/Ac/Grid/L1/Power"),
+ QoS::AtLeastOnce,
+ false,
+ r#"{"value": 120.0}"#,
+ )
+ .await?;
+ pub_client
+ .publish(
+ format!("N/{TEST_SERIAL}/system/0/Ac/Consumption/L1/Power"),
+ QoS::AtLeastOnce,
+ false,
+ r#"{"value": 95.0}"#,
+ )
+ .await?;
+
+ let all_received = tokio::time::timeout(Duration::from_secs(10), reader).await???;
+ assert!(all_received, "expected all topics to be received");
+
+ tokio::time::sleep(Duration::from_millis(100)).await;
+
+ let client = fixture.dbus_client_connection().await?;
+ let battery = BatteryProxy::new(&client).await?;
+ let solar = SolarProxy::new(&client).await?;
+ let ac_input = AcInputProxy::new(&client).await?;
+ let ac_load = AcLoadProxy::new(&client).await?;
+
+ assert!((battery.soc().await? - 225.5).abs() < f64::EPSILON);
+ assert!((battery.power().await? - 30.0).abs() < f64::EPSILON);
+ assert!((solar.power().await? - 450.0).abs() < f64::EPSILON);
+ assert!((ac_input.power().await? - 120.0).abs() < f64::EPSILON);
+ assert!((ac_load.power().await? - 95.0).abs() < f64::EPSILON);
+
+ Ok(())
+}
+
+#[ignore] // Takes ~30s due to internal timeout; run with `cargo test -- --ignored`
+#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+async fn test_mqtt_timeout_with_partial_data() -> Result<()> {
+ let fixture = E2ETestFixture::new();
+ let (conn, dbus_state) = start_service_at(fixture.dbus_address()).await?;
+
+ let (pub_client, mut pub_eventloop) = fixture.create_publisher_client("partial-pub");
+ tokio::spawn(async move { while (pub_eventloop.poll().await).is_ok() {} });
+
+ let cfg = fixture.test_config();
+ let opts = build_mqtt_options(&cfg, "127.0.0.1", fixture.mqtt_port());
+ let (svc_client, mut svc_eventloop) = AsyncClient::new(opts, 10);
+
+ let conn_clone = conn.clone();
+ let dbus_state_clone = dbus_state.clone();
+ let reader = tokio::spawn(async move {
+ read_values_and_update_properties_immediately(
+ &svc_client,
+ &mut svc_eventloop,
+ TEST_SERIAL,
+ &conn_clone,
+ &dbus_state_clone,
+ )
+ .await
+ });
+
+ tokio::time::sleep(Duration::from_millis(300)).await;
+
+ // Only publish Batteries (omit solar + AC) — 1 of 4 topics
+ pub_client
+ .publish(
+ format!("N/{TEST_SERIAL}/system/0/Batteries"),
+ QoS::AtLeastOnce,
+ false,
+ batteries_json(55.0, 10.0, VICTRON_STATE_CHARGING),
+ )
+ .await?;
+
+ // Should return false after the 30s internal timeout
+ let all_received = tokio::time::timeout(Duration::from_secs(35), reader).await???;
+ assert!(!all_received, "expected timeout with partial data");
+
+ // The received battery properties should be set on D-Bus
+ let client = fixture.dbus_client_connection().await?;
+ let battery = BatteryProxy::new(&client).await?;
+
+ assert!((battery.soc().await? - 55.0).abs() < f64::EPSILON);
+ assert!((battery.power().await? - 10.0).abs() < f64::EPSILON);
+ // Solar should still be at default (0.0)
+ let solar = SolarProxy::new(&client).await?;
+ assert!((solar.power().await? - 0.0).abs() < f64::EPSILON);
+
+ Ok(())
+}