summaryrefslogtreecommitdiff
path: root/service/src/victron_mqtt.rs
diff options
context:
space:
mode:
Diffstat (limited to 'service/src/victron_mqtt.rs')
-rw-r--r--service/src/victron_mqtt.rs576
1 files changed, 576 insertions, 0 deletions
diff --git a/service/src/victron_mqtt.rs b/service/src/victron_mqtt.rs
new file mode 100644
index 0000000..d72ddb3
--- /dev/null
+++ b/service/src/victron_mqtt.rs
@@ -0,0 +1,576 @@
+use std::time::Duration;
+
+use crate::config::Config;
+use crate::dbus::{PropertyType, SharedState, update_battery_state_str, update_single_property};
+use anyhow::{Result, anyhow};
+use rumqttc::{AsyncClient, Event, EventLoop, Incoming, MqttOptions, QoS};
+use tokio::{net::TcpStream, select, time};
+use tracing::{debug, info, warn};
+use zbus::Connection;
+
+// Topic-to-property mapping structure (for simple 1:1 topic→property mappings)
+#[derive(Debug, Clone)]
+pub struct TopicPropertyMapping {
+ pub topic_pattern: String,
+ pub property_type: PropertyType,
+}
+
+impl TopicPropertyMapping {
+ pub fn new(topic_pattern: String, property_type: PropertyType) -> Self {
+ Self {
+ topic_pattern,
+ property_type,
+ }
+ }
+}
+
+// Collection of all topic mappings for a given serial
+pub struct MqttPropertyMappings {
+ mappings: Vec<TopicPropertyMapping>,
+ batteries_topic: String, // system/0/Batteries — yields SoC, Power, State
+}
+
+impl MqttPropertyMappings {
+ pub fn new(serial: &str) -> Self {
+ Self {
+ mappings: vec![
+ TopicPropertyMapping::new(
+ format!("N/{serial}/solarcharger/277/Yield/Power"),
+ PropertyType::SolarPower,
+ ),
+ TopicPropertyMapping::new(
+ format!("N/{serial}/system/0/Ac/Grid/L1/Power"),
+ PropertyType::AcInputPower,
+ ),
+ TopicPropertyMapping::new(
+ format!("N/{serial}/system/0/Ac/Consumption/L1/Power"),
+ PropertyType::AcLoadPower,
+ ),
+ ],
+ batteries_topic: format!("N/{serial}/system/0/Batteries"),
+ }
+ }
+
+ // Find mapping for a given topic (simple 1:1 mappings only)
+ pub fn find_mapping(&self, topic: &str) -> Option<&TopicPropertyMapping> {
+ self.mappings.iter().find(|m| m.topic_pattern == topic)
+ }
+
+ // Check if topic is the composite Batteries topic
+ pub fn is_batteries_topic(&self, topic: &str) -> bool {
+ self.batteries_topic == topic
+ }
+
+ // Get all topic patterns for subscription
+ pub fn get_subscription_topics(&self) -> Vec<&str> {
+ let mut topics: Vec<&str> = self
+ .mappings
+ .iter()
+ .map(|m| m.topic_pattern.as_str())
+ .collect();
+ topics.push(&self.batteries_topic);
+ topics
+ }
+}
+
+/// Parse the system/0/Batteries JSON payload.
+/// Returns (soc, power, state_string) from the first battery entry.
+pub fn parse_batteries_payload(payload: &str) -> Option<(f64, f64, String)> {
+ let json: serde_json::Value = serde_json::from_str(payload).ok()?;
+ let batteries = json.get("value")?.as_array()?;
+ let battery = batteries.first()?;
+ let soc = battery.get("soc")?.as_f64()?;
+ let power = battery.get("power")?.as_f64()?;
+ let state_num = battery.get("state")?.as_i64().unwrap_or(0);
+ let state_str = match state_num {
+ 1 => "charging",
+ 2 => "discharging",
+ _ => "idle",
+ };
+ Some((soc, power, state_str.to_string()))
+}
+
+// Parse numeric value from MQTT payload (moved from main.rs)
+fn parse_numeric_value(s: Option<&String>) -> f64 {
+ if let Some(raw) = s {
+ // First try direct number parsing
+ if let Ok(v) = raw.trim().parse::<f64>() {
+ return v;
+ }
+ // Try JSON object with {"value": number}
+ if let Ok(val) = serde_json::from_str::<serde_json::Value>(raw)
+ && let Some(v) = val.get("value")
+ {
+ if let Some(n) = v.as_f64() {
+ return n;
+ }
+ if let Some(s) = v.as_str()
+ && let Ok(n) = s.trim().parse::<f64>()
+ {
+ return n;
+ }
+ }
+ }
+ 0.0
+}
+
+// Returns Some(serial) when topic is like "N/{serial}/system/0/Serial"
+pub fn extract_serial_from_topic(topic: &str) -> Option<String> {
+ if topic.starts_with("N/") && topic.ends_with("/system/0/Serial") {
+ let parts: Vec<&str> = topic.split('/').collect();
+ if parts.len() >= 3 {
+ return Some(parts[1].to_string());
+ }
+ }
+ None
+}
+
+pub async fn disconnect_mqtt(client: &AsyncClient) {
+ if let Err(e) = client.disconnect().await {
+ warn!("MQTT disconnect failed: {}", e);
+ }
+}
+
+pub fn build_mqtt_options(cfg: &Config, host: &str, port: u16) -> MqttOptions {
+ let mut opts = MqttOptions::new(&cfg.client_id, host, port);
+ opts.set_keep_alive(Duration::from_secs(10));
+ if let (Some(u), Some(p)) = (&cfg.username, &cfg.password) {
+ opts.set_credentials(u, p);
+ }
+ opts
+}
+
+pub async fn pick_first_available(endpoints: &[(String, u16)]) -> Result<(String, u16)> {
+ for (host, port) in endpoints {
+ let addr = (host.as_str(), *port);
+ let attempt = time::timeout(Duration::from_secs(2), TcpStream::connect(addr)).await;
+ match attempt {
+ Ok(Ok(stream)) => {
+ // Successfully connected; close and return this endpoint
+ drop(stream);
+ return Ok((host.clone(), *port));
+ }
+ _ => {
+ // Try next
+ }
+ }
+ }
+ Err(anyhow!("No reachable MQTT endpoint from config"))
+}
+
+pub async fn wait_for_serial(client: &AsyncClient, eventloop: &mut EventLoop) -> Result<String> {
+ // Subscribe to N/+/system/0/Serial
+ client
+ .subscribe("N/+/system/0/Serial", QoS::AtLeastOnce)
+ .await?;
+ loop {
+ match eventloop.poll().await? {
+ Event::Incoming(Incoming::Publish(p)) => {
+ let topic = p.topic.clone();
+ if let Some(serial) = extract_serial_from_topic(&topic) {
+ info!("Detected serial from topic: {}", serial);
+ return Ok(serial);
+ }
+ }
+ _ => {
+ // Ignore other events
+ }
+ }
+ }
+}
+
+/// Read values and update DBus properties immediately as each message arrives.
+/// Returns `Ok(true)` if all topics were received, `Ok(false)` on timeout with partial data.
+pub async fn read_values_and_update_properties_immediately(
+ client: &AsyncClient,
+ eventloop: &mut EventLoop,
+ serial: &str,
+ conn: &Connection,
+ dbus_state: &SharedState,
+) -> Result<bool> {
+ let mappings = MqttPropertyMappings::new(serial);
+ let topics = mappings.get_subscription_topics();
+
+ // Subscribe to all topics
+ for topic in &topics {
+ client.subscribe(*topic, QoS::AtLeastOnce).await?;
+ }
+ debug!(
+ "Subscribed to {} topics for serial={}",
+ topics.len(),
+ serial
+ );
+
+ let mut received_topics = std::collections::HashSet::new();
+ let total_topics = topics.len();
+
+ // We'll wait up to 30 seconds for all values
+ let timeout = time::sleep(Duration::from_secs(30));
+ tokio::pin!(timeout);
+
+ let all_received = loop {
+ select! {
+ _ = &mut timeout => {
+ warn!("Timeout while waiting for all values, received {} of {}", received_topics.len(), total_topics);
+ break false;
+ }
+ ev = eventloop.poll() => {
+ match ev? {
+ Event::Incoming(Incoming::Publish(p)) => {
+ let topic = p.topic.clone();
+ let payload_str = String::from_utf8_lossy(&p.payload).to_string();
+
+ if let Some(mapping) = mappings.find_mapping(&topic) {
+ let value = parse_numeric_value(Some(&payload_str));
+ update_single_property(conn, dbus_state, &mapping.property_type, value).await?;
+ received_topics.insert(topic.clone());
+ info!("Updated {:?} with value {} ({} of {} topics received)",
+ mapping.property_type, value, received_topics.len(), total_topics);
+ } else if mappings.is_batteries_topic(&topic)
+ && let Some((soc, power, state_str)) = parse_batteries_payload(&payload_str)
+ {
+ update_single_property(conn, dbus_state, &PropertyType::BatterySoc, soc).await?;
+ update_single_property(conn, dbus_state, &PropertyType::BatteryPower, power).await?;
+ update_battery_state_str(conn, dbus_state, &state_str).await?;
+ received_topics.insert(topic.clone());
+ info!("Updated battery from Batteries topic: soc={}, power={}, state={} ({} of {} topics received)",
+ soc, power, state_str, received_topics.len(), total_topics);
+ }
+
+ if received_topics.len() >= total_topics {
+ info!("All {} properties updated successfully", total_topics);
+ break true;
+ }
+ }
+ _ => {
+ // Ignore other events (ConnAck, SubAck, etc.)
+ }
+ }
+ }
+ }
+ };
+
+ Ok(all_received)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use tokio::net::TcpListener;
+
+ #[test]
+ fn test_extract_serial_from_topic() {
+ assert_eq!(
+ extract_serial_from_topic("N/abc123/system/0/Serial"),
+ Some("abc123".to_string())
+ );
+ assert_eq!(
+ extract_serial_from_topic("N/xyz/system/0/Serial"),
+ Some("xyz".to_string())
+ );
+ assert_eq!(
+ extract_serial_from_topic("N//system/0/Serial"),
+ Some("".to_string())
+ );
+ assert_eq!(extract_serial_from_topic("N/abc123/system/0/Other"), None);
+ assert_eq!(extract_serial_from_topic("foo/bar"), None);
+ }
+
+ #[tokio::test]
+ async fn test_pick_first_available_picks_open_second() {
+ // Start a listener for the second endpoint only
+ let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr = listener.local_addr().unwrap();
+
+ let endpoints = vec![
+ ("127.0.0.1".to_string(), 1), // very likely closed
+ (addr.ip().to_string(), addr.port()),
+ ];
+
+ let chosen = pick_first_available(&endpoints).await.unwrap();
+ assert_eq!(chosen, (addr.ip().to_string(), addr.port()));
+
+ drop(listener);
+ }
+
+ #[tokio::test]
+ async fn test_pick_first_available_prefers_first_when_both_open() {
+ let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr1 = listener1.local_addr().unwrap();
+ let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let addr2 = listener2.local_addr().unwrap();
+
+ let endpoints = vec![
+ (addr1.ip().to_string(), addr1.port()),
+ (addr2.ip().to_string(), addr2.port()),
+ ];
+
+ let chosen = pick_first_available(&endpoints).await.unwrap();
+ assert_eq!(chosen, (addr1.ip().to_string(), addr1.port()));
+
+ drop(listener1);
+ drop(listener2);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_direct() {
+ let value = "123.45".to_string();
+ let input = Some(&value);
+ assert_eq!(parse_numeric_value(input), 123.45);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json() {
+ let value = r#"{"value": 67.89}"#.to_string();
+ let input = Some(&value);
+ assert_eq!(parse_numeric_value(input), 67.89);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json_string() {
+ let value = r#"{"value": "90.12"}"#.to_string();
+ let input = Some(&value);
+ assert_eq!(parse_numeric_value(input), 90.12);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_invalid() {
+ let value = "invalid".to_string();
+ let input = Some(&value);
+ assert_eq!(parse_numeric_value(input), 0.0);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_none() {
+ let input = None;
+ assert_eq!(parse_numeric_value(input), 0.0);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_whitespace() {
+ let value = " 42.5 ".to_string();
+ let input = Some(&value);
+ assert_eq!(parse_numeric_value(input), 42.5);
+ }
+
+ #[test]
+ fn test_topic_property_mapping() {
+ let mappings = MqttPropertyMappings::new("test123");
+ let topics = mappings.get_subscription_topics();
+
+ assert_eq!(topics.len(), 4);
+ assert!(topics.contains(&"N/test123/solarcharger/277/Yield/Power"));
+ assert!(topics.contains(&"N/test123/system/0/Ac/Grid/L1/Power"));
+ assert!(topics.contains(&"N/test123/system/0/Ac/Consumption/L1/Power"));
+ assert!(topics.contains(&"N/test123/system/0/Batteries"));
+
+ // Test finding solar mapping
+ let solar_mapping = mappings.find_mapping("N/test123/solarcharger/277/Yield/Power");
+ assert!(solar_mapping.is_some());
+ assert_eq!(
+ solar_mapping.unwrap().property_type,
+ PropertyType::SolarPower
+ );
+
+ // Test finding AC input mapping
+ let ac_input_mapping = mappings.find_mapping("N/test123/system/0/Ac/Grid/L1/Power");
+ assert!(ac_input_mapping.is_some());
+ assert_eq!(
+ ac_input_mapping.unwrap().property_type,
+ PropertyType::AcInputPower
+ );
+
+ // Test finding AC load mapping
+ let ac_load_mapping = mappings.find_mapping("N/test123/system/0/Ac/Consumption/L1/Power");
+ assert!(ac_load_mapping.is_some());
+ assert_eq!(
+ ac_load_mapping.unwrap().property_type,
+ PropertyType::AcLoadPower
+ );
+
+ // Batteries topic is handled specially, not in find_mapping
+ assert!(
+ mappings
+ .find_mapping("N/test123/system/0/Batteries")
+ .is_none()
+ );
+ assert!(mappings.is_batteries_topic("N/test123/system/0/Batteries"));
+ }
+
+ // --- parse_numeric_value: fallback chain and design decisions ---
+
+ #[test]
+ fn test_parse_numeric_value_negative() {
+ let value = "-123.45".to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), -123.45);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json_negative() {
+ let value = r#"{"value": -67.89}"#.to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), -67.89);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json_null() {
+ let value = r#"{"value": null}"#.to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), 0.0);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json_missing_value_key() {
+ let value = r#"{"power": 42}"#.to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), 0.0);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_empty_string() {
+ let value = "".to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), 0.0);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json_with_extra_fields() {
+ let value = r#"{"value": 42, "unit": "W"}"#.to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), 42.0);
+ }
+
+ #[test]
+ fn test_parse_numeric_value_json_non_numeric_string() {
+ let value = r#"{"value": "n/a"}"#.to_string();
+ assert_eq!(parse_numeric_value(Some(&value)), 0.0);
+ }
+
+ // --- extract_serial_from_topic: boundary cases ---
+
+ #[test]
+ fn test_extract_serial_empty_topic() {
+ assert_eq!(extract_serial_from_topic(""), None);
+ }
+
+ #[test]
+ fn test_extract_serial_data_topic_rejected() {
+ // Data topics must not be mistaken for serial discovery topics
+ assert_eq!(
+ extract_serial_from_topic("N/abc/battery/278/Dc/0/Power"),
+ None
+ );
+ }
+
+ // --- MqttPropertyMappings: Victron device mapping design ---
+
+ #[test]
+ fn test_mapping_find_returns_none_for_unknown_topic() {
+ let mappings = MqttPropertyMappings::new("serial1");
+ assert!(mappings.find_mapping("N/serial1/unknown/0/Foo").is_none());
+ }
+
+ #[test]
+ fn test_mapping_solar_power_exists() {
+ let mappings = MqttPropertyMappings::new("s1");
+ let solar = mappings
+ .find_mapping("N/s1/solarcharger/277/Yield/Power")
+ .expect("solar mapping should exist");
+ assert_eq!(solar.property_type, PropertyType::SolarPower);
+ }
+
+ #[test]
+ fn test_mapping_different_serials_dont_cross_match() {
+ let mappings_a = MqttPropertyMappings::new("AAA");
+ let mappings_b = MqttPropertyMappings::new("BBB");
+ // Simple mappings from serial AAA must not match in serial BBB
+ for topic in mappings_a.get_subscription_topics() {
+ assert!(
+ mappings_b.find_mapping(topic).is_none(),
+ "serial BBB should not match topic from serial AAA: {topic}"
+ );
+ if mappings_a.is_batteries_topic(topic) {
+ assert!(
+ !mappings_b.is_batteries_topic(topic),
+ "serial BBB batteries topic should not match serial AAA: {topic}"
+ );
+ }
+ }
+ }
+
+ #[test]
+ fn test_parse_batteries_payload_charging() {
+ let payload = r#"{"value":[{"active_battery_service":true,"current":1.8,"id":"com.victronenergy.battery.ttyS7","instance":279,"name":"Shunt","power":48.13,"soc":67.1,"state":1,"voltage":26.74}]}"#;
+ let (soc, power, state) = parse_batteries_payload(payload).unwrap();
+ assert!((soc - 67.1).abs() < 0.01);
+ assert!((power - 48.13).abs() < 0.01);
+ assert_eq!(state, "charging");
+ }
+
+ #[test]
+ fn test_parse_batteries_payload_discharging() {
+ let payload = r#"{"value":[{"soc":50.0,"power":-334.78,"state":2}]}"#;
+ let (soc, power, state) = parse_batteries_payload(payload).unwrap();
+ assert!((soc - 50.0).abs() < 0.01);
+ assert!((power - (-334.78)).abs() < 0.01);
+ assert_eq!(state, "discharging");
+ }
+
+ #[test]
+ fn test_parse_batteries_payload_idle() {
+ let payload = r#"{"value":[{"soc":100.0,"power":0.0,"state":0}]}"#;
+ let (soc, _power, state) = parse_batteries_payload(payload).unwrap();
+ assert!((soc - 100.0).abs() < 0.01);
+ assert_eq!(state, "idle");
+ }
+
+ #[test]
+ fn test_parse_batteries_payload_invalid() {
+ assert!(parse_batteries_payload("not json").is_none());
+ assert!(parse_batteries_payload(r#"{"value":[]}"#).is_none());
+ assert!(parse_batteries_payload(r#"{"value":"nope"}"#).is_none());
+ }
+
+ #[test]
+ fn test_mapping_topics_embed_serial() {
+ let serial = "mySerial42";
+ let mappings = MqttPropertyMappings::new(serial);
+ for topic in mappings.get_subscription_topics() {
+ assert!(
+ topic.contains(serial),
+ "topic should contain serial: {topic}"
+ );
+ }
+ }
+
+ // --- build_mqtt_options: configuration policy ---
+
+ #[test]
+ fn test_build_mqtt_options_applies_credentials_when_present() {
+ let cfg = Config {
+ endpoints: vec![("127.0.0.1".to_string(), 1883)],
+ username: Some("user".to_string()),
+ password: Some("pass".to_string()),
+ client_id: "test-client".to_string(),
+ refresh_interval_seconds: 60,
+ };
+ let opts = build_mqtt_options(&cfg, "127.0.0.1", 1883);
+ let login = opts.credentials().expect("credentials should be set");
+ assert_eq!(login.username, "user");
+ assert_eq!(login.password, "pass");
+ assert_eq!(opts.keep_alive(), Duration::from_secs(10));
+ }
+
+ #[test]
+ fn test_build_mqtt_options_skips_credentials_when_absent() {
+ let cfg = Config {
+ endpoints: vec![("127.0.0.1".to_string(), 1883)],
+ username: None,
+ password: None,
+ client_id: "anon-client".to_string(),
+ refresh_interval_seconds: 60,
+ };
+ let opts = build_mqtt_options(&cfg, "127.0.0.1", 1883);
+ assert!(
+ opts.credentials().is_none(),
+ "anonymous mode should have no credentials"
+ );
+ assert_eq!(opts.client_id(), "anon-client");
+ assert_eq!(opts.keep_alive(), Duration::from_secs(10));
+ }
+}