diff options
| author | Dawid Rycerz <dawid@rycerz.xyz> | 2026-02-07 17:29:48 +0100 |
|---|---|---|
| committer | Dawid Rycerz <dawid@rycerz.xyz> | 2026-02-07 17:29:48 +0100 |
| commit | 2eda97537b63d68b2e9ba06500e3fb491894d10c (patch) | |
| tree | 52873ad380cd97f4327765aac24659a2b00079b1 /service/src/victron_mqtt.rs | |
feat: camper van energy monitoring widget for Plasma 6main
Pure QML KPackage widget with Rust background service for real-time
Victron energy system monitoring via MQTT and D-Bus.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat (limited to 'service/src/victron_mqtt.rs')
| -rw-r--r-- | service/src/victron_mqtt.rs | 576 |
1 files changed, 576 insertions, 0 deletions
diff --git a/service/src/victron_mqtt.rs b/service/src/victron_mqtt.rs new file mode 100644 index 0000000..d72ddb3 --- /dev/null +++ b/service/src/victron_mqtt.rs @@ -0,0 +1,576 @@ +use std::time::Duration; + +use crate::config::Config; +use crate::dbus::{PropertyType, SharedState, update_battery_state_str, update_single_property}; +use anyhow::{Result, anyhow}; +use rumqttc::{AsyncClient, Event, EventLoop, Incoming, MqttOptions, QoS}; +use tokio::{net::TcpStream, select, time}; +use tracing::{debug, info, warn}; +use zbus::Connection; + +// Topic-to-property mapping structure (for simple 1:1 topic→property mappings) +#[derive(Debug, Clone)] +pub struct TopicPropertyMapping { + pub topic_pattern: String, + pub property_type: PropertyType, +} + +impl TopicPropertyMapping { + pub fn new(topic_pattern: String, property_type: PropertyType) -> Self { + Self { + topic_pattern, + property_type, + } + } +} + +// Collection of all topic mappings for a given serial +pub struct MqttPropertyMappings { + mappings: Vec<TopicPropertyMapping>, + batteries_topic: String, // system/0/Batteries — yields SoC, Power, State +} + +impl MqttPropertyMappings { + pub fn new(serial: &str) -> Self { + Self { + mappings: vec![ + TopicPropertyMapping::new( + format!("N/{serial}/solarcharger/277/Yield/Power"), + PropertyType::SolarPower, + ), + TopicPropertyMapping::new( + format!("N/{serial}/system/0/Ac/Grid/L1/Power"), + PropertyType::AcInputPower, + ), + TopicPropertyMapping::new( + format!("N/{serial}/system/0/Ac/Consumption/L1/Power"), + PropertyType::AcLoadPower, + ), + ], + batteries_topic: format!("N/{serial}/system/0/Batteries"), + } + } + + // Find mapping for a given topic (simple 1:1 mappings only) + pub fn find_mapping(&self, topic: &str) -> Option<&TopicPropertyMapping> { + self.mappings.iter().find(|m| m.topic_pattern == topic) + } + + // Check if topic is the composite Batteries topic + pub fn is_batteries_topic(&self, topic: &str) -> bool { + self.batteries_topic == topic + } + + // Get all topic patterns for subscription + pub fn get_subscription_topics(&self) -> Vec<&str> { + let mut topics: Vec<&str> = self + .mappings + .iter() + .map(|m| m.topic_pattern.as_str()) + .collect(); + topics.push(&self.batteries_topic); + topics + } +} + +/// Parse the system/0/Batteries JSON payload. +/// Returns (soc, power, state_string) from the first battery entry. +pub fn parse_batteries_payload(payload: &str) -> Option<(f64, f64, String)> { + let json: serde_json::Value = serde_json::from_str(payload).ok()?; + let batteries = json.get("value")?.as_array()?; + let battery = batteries.first()?; + let soc = battery.get("soc")?.as_f64()?; + let power = battery.get("power")?.as_f64()?; + let state_num = battery.get("state")?.as_i64().unwrap_or(0); + let state_str = match state_num { + 1 => "charging", + 2 => "discharging", + _ => "idle", + }; + Some((soc, power, state_str.to_string())) +} + +// Parse numeric value from MQTT payload (moved from main.rs) +fn parse_numeric_value(s: Option<&String>) -> f64 { + if let Some(raw) = s { + // First try direct number parsing + if let Ok(v) = raw.trim().parse::<f64>() { + return v; + } + // Try JSON object with {"value": number} + if let Ok(val) = serde_json::from_str::<serde_json::Value>(raw) + && let Some(v) = val.get("value") + { + if let Some(n) = v.as_f64() { + return n; + } + if let Some(s) = v.as_str() + && let Ok(n) = s.trim().parse::<f64>() + { + return n; + } + } + } + 0.0 +} + +// Returns Some(serial) when topic is like "N/{serial}/system/0/Serial" +pub fn extract_serial_from_topic(topic: &str) -> Option<String> { + if topic.starts_with("N/") && topic.ends_with("/system/0/Serial") { + let parts: Vec<&str> = topic.split('/').collect(); + if parts.len() >= 3 { + return Some(parts[1].to_string()); + } + } + None +} + +pub async fn disconnect_mqtt(client: &AsyncClient) { + if let Err(e) = client.disconnect().await { + warn!("MQTT disconnect failed: {}", e); + } +} + +pub fn build_mqtt_options(cfg: &Config, host: &str, port: u16) -> MqttOptions { + let mut opts = MqttOptions::new(&cfg.client_id, host, port); + opts.set_keep_alive(Duration::from_secs(10)); + if let (Some(u), Some(p)) = (&cfg.username, &cfg.password) { + opts.set_credentials(u, p); + } + opts +} + +pub async fn pick_first_available(endpoints: &[(String, u16)]) -> Result<(String, u16)> { + for (host, port) in endpoints { + let addr = (host.as_str(), *port); + let attempt = time::timeout(Duration::from_secs(2), TcpStream::connect(addr)).await; + match attempt { + Ok(Ok(stream)) => { + // Successfully connected; close and return this endpoint + drop(stream); + return Ok((host.clone(), *port)); + } + _ => { + // Try next + } + } + } + Err(anyhow!("No reachable MQTT endpoint from config")) +} + +pub async fn wait_for_serial(client: &AsyncClient, eventloop: &mut EventLoop) -> Result<String> { + // Subscribe to N/+/system/0/Serial + client + .subscribe("N/+/system/0/Serial", QoS::AtLeastOnce) + .await?; + loop { + match eventloop.poll().await? { + Event::Incoming(Incoming::Publish(p)) => { + let topic = p.topic.clone(); + if let Some(serial) = extract_serial_from_topic(&topic) { + info!("Detected serial from topic: {}", serial); + return Ok(serial); + } + } + _ => { + // Ignore other events + } + } + } +} + +/// Read values and update DBus properties immediately as each message arrives. +/// Returns `Ok(true)` if all topics were received, `Ok(false)` on timeout with partial data. +pub async fn read_values_and_update_properties_immediately( + client: &AsyncClient, + eventloop: &mut EventLoop, + serial: &str, + conn: &Connection, + dbus_state: &SharedState, +) -> Result<bool> { + let mappings = MqttPropertyMappings::new(serial); + let topics = mappings.get_subscription_topics(); + + // Subscribe to all topics + for topic in &topics { + client.subscribe(*topic, QoS::AtLeastOnce).await?; + } + debug!( + "Subscribed to {} topics for serial={}", + topics.len(), + serial + ); + + let mut received_topics = std::collections::HashSet::new(); + let total_topics = topics.len(); + + // We'll wait up to 30 seconds for all values + let timeout = time::sleep(Duration::from_secs(30)); + tokio::pin!(timeout); + + let all_received = loop { + select! { + _ = &mut timeout => { + warn!("Timeout while waiting for all values, received {} of {}", received_topics.len(), total_topics); + break false; + } + ev = eventloop.poll() => { + match ev? { + Event::Incoming(Incoming::Publish(p)) => { + let topic = p.topic.clone(); + let payload_str = String::from_utf8_lossy(&p.payload).to_string(); + + if let Some(mapping) = mappings.find_mapping(&topic) { + let value = parse_numeric_value(Some(&payload_str)); + update_single_property(conn, dbus_state, &mapping.property_type, value).await?; + received_topics.insert(topic.clone()); + info!("Updated {:?} with value {} ({} of {} topics received)", + mapping.property_type, value, received_topics.len(), total_topics); + } else if mappings.is_batteries_topic(&topic) + && let Some((soc, power, state_str)) = parse_batteries_payload(&payload_str) + { + update_single_property(conn, dbus_state, &PropertyType::BatterySoc, soc).await?; + update_single_property(conn, dbus_state, &PropertyType::BatteryPower, power).await?; + update_battery_state_str(conn, dbus_state, &state_str).await?; + received_topics.insert(topic.clone()); + info!("Updated battery from Batteries topic: soc={}, power={}, state={} ({} of {} topics received)", + soc, power, state_str, received_topics.len(), total_topics); + } + + if received_topics.len() >= total_topics { + info!("All {} properties updated successfully", total_topics); + break true; + } + } + _ => { + // Ignore other events (ConnAck, SubAck, etc.) + } + } + } + } + }; + + Ok(all_received) +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio::net::TcpListener; + + #[test] + fn test_extract_serial_from_topic() { + assert_eq!( + extract_serial_from_topic("N/abc123/system/0/Serial"), + Some("abc123".to_string()) + ); + assert_eq!( + extract_serial_from_topic("N/xyz/system/0/Serial"), + Some("xyz".to_string()) + ); + assert_eq!( + extract_serial_from_topic("N//system/0/Serial"), + Some("".to_string()) + ); + assert_eq!(extract_serial_from_topic("N/abc123/system/0/Other"), None); + assert_eq!(extract_serial_from_topic("foo/bar"), None); + } + + #[tokio::test] + async fn test_pick_first_available_picks_open_second() { + // Start a listener for the second endpoint only + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + + let endpoints = vec![ + ("127.0.0.1".to_string(), 1), // very likely closed + (addr.ip().to_string(), addr.port()), + ]; + + let chosen = pick_first_available(&endpoints).await.unwrap(); + assert_eq!(chosen, (addr.ip().to_string(), addr.port())); + + drop(listener); + } + + #[tokio::test] + async fn test_pick_first_available_prefers_first_when_both_open() { + let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr1 = listener1.local_addr().unwrap(); + let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr2 = listener2.local_addr().unwrap(); + + let endpoints = vec![ + (addr1.ip().to_string(), addr1.port()), + (addr2.ip().to_string(), addr2.port()), + ]; + + let chosen = pick_first_available(&endpoints).await.unwrap(); + assert_eq!(chosen, (addr1.ip().to_string(), addr1.port())); + + drop(listener1); + drop(listener2); + } + + #[test] + fn test_parse_numeric_value_direct() { + let value = "123.45".to_string(); + let input = Some(&value); + assert_eq!(parse_numeric_value(input), 123.45); + } + + #[test] + fn test_parse_numeric_value_json() { + let value = r#"{"value": 67.89}"#.to_string(); + let input = Some(&value); + assert_eq!(parse_numeric_value(input), 67.89); + } + + #[test] + fn test_parse_numeric_value_json_string() { + let value = r#"{"value": "90.12"}"#.to_string(); + let input = Some(&value); + assert_eq!(parse_numeric_value(input), 90.12); + } + + #[test] + fn test_parse_numeric_value_invalid() { + let value = "invalid".to_string(); + let input = Some(&value); + assert_eq!(parse_numeric_value(input), 0.0); + } + + #[test] + fn test_parse_numeric_value_none() { + let input = None; + assert_eq!(parse_numeric_value(input), 0.0); + } + + #[test] + fn test_parse_numeric_value_whitespace() { + let value = " 42.5 ".to_string(); + let input = Some(&value); + assert_eq!(parse_numeric_value(input), 42.5); + } + + #[test] + fn test_topic_property_mapping() { + let mappings = MqttPropertyMappings::new("test123"); + let topics = mappings.get_subscription_topics(); + + assert_eq!(topics.len(), 4); + assert!(topics.contains(&"N/test123/solarcharger/277/Yield/Power")); + assert!(topics.contains(&"N/test123/system/0/Ac/Grid/L1/Power")); + assert!(topics.contains(&"N/test123/system/0/Ac/Consumption/L1/Power")); + assert!(topics.contains(&"N/test123/system/0/Batteries")); + + // Test finding solar mapping + let solar_mapping = mappings.find_mapping("N/test123/solarcharger/277/Yield/Power"); + assert!(solar_mapping.is_some()); + assert_eq!( + solar_mapping.unwrap().property_type, + PropertyType::SolarPower + ); + + // Test finding AC input mapping + let ac_input_mapping = mappings.find_mapping("N/test123/system/0/Ac/Grid/L1/Power"); + assert!(ac_input_mapping.is_some()); + assert_eq!( + ac_input_mapping.unwrap().property_type, + PropertyType::AcInputPower + ); + + // Test finding AC load mapping + let ac_load_mapping = mappings.find_mapping("N/test123/system/0/Ac/Consumption/L1/Power"); + assert!(ac_load_mapping.is_some()); + assert_eq!( + ac_load_mapping.unwrap().property_type, + PropertyType::AcLoadPower + ); + + // Batteries topic is handled specially, not in find_mapping + assert!( + mappings + .find_mapping("N/test123/system/0/Batteries") + .is_none() + ); + assert!(mappings.is_batteries_topic("N/test123/system/0/Batteries")); + } + + // --- parse_numeric_value: fallback chain and design decisions --- + + #[test] + fn test_parse_numeric_value_negative() { + let value = "-123.45".to_string(); + assert_eq!(parse_numeric_value(Some(&value)), -123.45); + } + + #[test] + fn test_parse_numeric_value_json_negative() { + let value = r#"{"value": -67.89}"#.to_string(); + assert_eq!(parse_numeric_value(Some(&value)), -67.89); + } + + #[test] + fn test_parse_numeric_value_json_null() { + let value = r#"{"value": null}"#.to_string(); + assert_eq!(parse_numeric_value(Some(&value)), 0.0); + } + + #[test] + fn test_parse_numeric_value_json_missing_value_key() { + let value = r#"{"power": 42}"#.to_string(); + assert_eq!(parse_numeric_value(Some(&value)), 0.0); + } + + #[test] + fn test_parse_numeric_value_empty_string() { + let value = "".to_string(); + assert_eq!(parse_numeric_value(Some(&value)), 0.0); + } + + #[test] + fn test_parse_numeric_value_json_with_extra_fields() { + let value = r#"{"value": 42, "unit": "W"}"#.to_string(); + assert_eq!(parse_numeric_value(Some(&value)), 42.0); + } + + #[test] + fn test_parse_numeric_value_json_non_numeric_string() { + let value = r#"{"value": "n/a"}"#.to_string(); + assert_eq!(parse_numeric_value(Some(&value)), 0.0); + } + + // --- extract_serial_from_topic: boundary cases --- + + #[test] + fn test_extract_serial_empty_topic() { + assert_eq!(extract_serial_from_topic(""), None); + } + + #[test] + fn test_extract_serial_data_topic_rejected() { + // Data topics must not be mistaken for serial discovery topics + assert_eq!( + extract_serial_from_topic("N/abc/battery/278/Dc/0/Power"), + None + ); + } + + // --- MqttPropertyMappings: Victron device mapping design --- + + #[test] + fn test_mapping_find_returns_none_for_unknown_topic() { + let mappings = MqttPropertyMappings::new("serial1"); + assert!(mappings.find_mapping("N/serial1/unknown/0/Foo").is_none()); + } + + #[test] + fn test_mapping_solar_power_exists() { + let mappings = MqttPropertyMappings::new("s1"); + let solar = mappings + .find_mapping("N/s1/solarcharger/277/Yield/Power") + .expect("solar mapping should exist"); + assert_eq!(solar.property_type, PropertyType::SolarPower); + } + + #[test] + fn test_mapping_different_serials_dont_cross_match() { + let mappings_a = MqttPropertyMappings::new("AAA"); + let mappings_b = MqttPropertyMappings::new("BBB"); + // Simple mappings from serial AAA must not match in serial BBB + for topic in mappings_a.get_subscription_topics() { + assert!( + mappings_b.find_mapping(topic).is_none(), + "serial BBB should not match topic from serial AAA: {topic}" + ); + if mappings_a.is_batteries_topic(topic) { + assert!( + !mappings_b.is_batteries_topic(topic), + "serial BBB batteries topic should not match serial AAA: {topic}" + ); + } + } + } + + #[test] + fn test_parse_batteries_payload_charging() { + let payload = r#"{"value":[{"active_battery_service":true,"current":1.8,"id":"com.victronenergy.battery.ttyS7","instance":279,"name":"Shunt","power":48.13,"soc":67.1,"state":1,"voltage":26.74}]}"#; + let (soc, power, state) = parse_batteries_payload(payload).unwrap(); + assert!((soc - 67.1).abs() < 0.01); + assert!((power - 48.13).abs() < 0.01); + assert_eq!(state, "charging"); + } + + #[test] + fn test_parse_batteries_payload_discharging() { + let payload = r#"{"value":[{"soc":50.0,"power":-334.78,"state":2}]}"#; + let (soc, power, state) = parse_batteries_payload(payload).unwrap(); + assert!((soc - 50.0).abs() < 0.01); + assert!((power - (-334.78)).abs() < 0.01); + assert_eq!(state, "discharging"); + } + + #[test] + fn test_parse_batteries_payload_idle() { + let payload = r#"{"value":[{"soc":100.0,"power":0.0,"state":0}]}"#; + let (soc, _power, state) = parse_batteries_payload(payload).unwrap(); + assert!((soc - 100.0).abs() < 0.01); + assert_eq!(state, "idle"); + } + + #[test] + fn test_parse_batteries_payload_invalid() { + assert!(parse_batteries_payload("not json").is_none()); + assert!(parse_batteries_payload(r#"{"value":[]}"#).is_none()); + assert!(parse_batteries_payload(r#"{"value":"nope"}"#).is_none()); + } + + #[test] + fn test_mapping_topics_embed_serial() { + let serial = "mySerial42"; + let mappings = MqttPropertyMappings::new(serial); + for topic in mappings.get_subscription_topics() { + assert!( + topic.contains(serial), + "topic should contain serial: {topic}" + ); + } + } + + // --- build_mqtt_options: configuration policy --- + + #[test] + fn test_build_mqtt_options_applies_credentials_when_present() { + let cfg = Config { + endpoints: vec![("127.0.0.1".to_string(), 1883)], + username: Some("user".to_string()), + password: Some("pass".to_string()), + client_id: "test-client".to_string(), + refresh_interval_seconds: 60, + }; + let opts = build_mqtt_options(&cfg, "127.0.0.1", 1883); + let login = opts.credentials().expect("credentials should be set"); + assert_eq!(login.username, "user"); + assert_eq!(login.password, "pass"); + assert_eq!(opts.keep_alive(), Duration::from_secs(10)); + } + + #[test] + fn test_build_mqtt_options_skips_credentials_when_absent() { + let cfg = Config { + endpoints: vec![("127.0.0.1".to_string(), 1883)], + username: None, + password: None, + client_id: "anon-client".to_string(), + refresh_interval_seconds: 60, + }; + let opts = build_mqtt_options(&cfg, "127.0.0.1", 1883); + assert!( + opts.credentials().is_none(), + "anonymous mode should have no credentials" + ); + assert_eq!(opts.client_id(), "anon-client"); + assert_eq!(opts.keep_alive(), Duration::from_secs(10)); + } +} |
