use std::time::Duration; use crate::config::Config; use crate::dbus::{PropertyType, SharedState, update_battery_state_str, update_single_property}; use anyhow::{Result, anyhow}; use rumqttc::{AsyncClient, Event, EventLoop, Incoming, MqttOptions, QoS}; use tokio::{net::TcpStream, select, time}; use tracing::{debug, info, warn}; use zbus::Connection; // Topic-to-property mapping structure (for simple 1:1 topic→property mappings) #[derive(Debug, Clone)] pub struct TopicPropertyMapping { pub topic_pattern: String, pub property_type: PropertyType, } impl TopicPropertyMapping { pub fn new(topic_pattern: String, property_type: PropertyType) -> Self { Self { topic_pattern, property_type, } } } // Collection of all topic mappings for a given serial pub struct MqttPropertyMappings { mappings: Vec, batteries_topic: String, // system/0/Batteries — yields SoC, Power, State } impl MqttPropertyMappings { pub fn new(serial: &str) -> Self { Self { mappings: vec![ TopicPropertyMapping::new( format!("N/{serial}/solarcharger/277/Yield/Power"), PropertyType::SolarPower, ), TopicPropertyMapping::new( format!("N/{serial}/system/0/Ac/Grid/L1/Power"), PropertyType::AcInputPower, ), TopicPropertyMapping::new( format!("N/{serial}/system/0/Ac/Consumption/L1/Power"), PropertyType::AcLoadPower, ), ], batteries_topic: format!("N/{serial}/system/0/Batteries"), } } // Find mapping for a given topic (simple 1:1 mappings only) pub fn find_mapping(&self, topic: &str) -> Option<&TopicPropertyMapping> { self.mappings.iter().find(|m| m.topic_pattern == topic) } // Check if topic is the composite Batteries topic pub fn is_batteries_topic(&self, topic: &str) -> bool { self.batteries_topic == topic } // Get all topic patterns for subscription pub fn get_subscription_topics(&self) -> Vec<&str> { let mut topics: Vec<&str> = self .mappings .iter() .map(|m| m.topic_pattern.as_str()) .collect(); topics.push(&self.batteries_topic); topics } } /// Parse the system/0/Batteries JSON payload. /// Returns (soc, power, state_string) from the first battery entry. pub fn parse_batteries_payload(payload: &str) -> Option<(f64, f64, String)> { let json: serde_json::Value = serde_json::from_str(payload).ok()?; let batteries = json.get("value")?.as_array()?; let battery = batteries.first()?; let soc = battery.get("soc")?.as_f64()?; let power = battery.get("power")?.as_f64()?; let state_num = battery.get("state")?.as_i64().unwrap_or(0); let state_str = match state_num { 1 => "charging", 2 => "discharging", _ => "idle", }; Some((soc, power, state_str.to_string())) } // Parse numeric value from MQTT payload (moved from main.rs) fn parse_numeric_value(s: Option<&String>) -> f64 { if let Some(raw) = s { // First try direct number parsing if let Ok(v) = raw.trim().parse::() { return v; } // Try JSON object with {"value": number} if let Ok(val) = serde_json::from_str::(raw) && let Some(v) = val.get("value") { if let Some(n) = v.as_f64() { return n; } if let Some(s) = v.as_str() && let Ok(n) = s.trim().parse::() { return n; } } } 0.0 } // Returns Some(serial) when topic is like "N/{serial}/system/0/Serial" pub fn extract_serial_from_topic(topic: &str) -> Option { if topic.starts_with("N/") && topic.ends_with("/system/0/Serial") { let parts: Vec<&str> = topic.split('/').collect(); if parts.len() >= 3 { return Some(parts[1].to_string()); } } None } pub async fn disconnect_mqtt(client: &AsyncClient) { if let Err(e) = client.disconnect().await { warn!("MQTT disconnect failed: {}", e); } } pub fn build_mqtt_options(cfg: &Config, host: &str, port: u16) -> MqttOptions { let mut opts = MqttOptions::new(&cfg.client_id, host, port); opts.set_keep_alive(Duration::from_secs(10)); if let (Some(u), Some(p)) = (&cfg.username, &cfg.password) { opts.set_credentials(u, p); } opts } pub async fn pick_first_available(endpoints: &[(String, u16)]) -> Result<(String, u16)> { for (host, port) in endpoints { let addr = (host.as_str(), *port); let attempt = time::timeout(Duration::from_secs(2), TcpStream::connect(addr)).await; match attempt { Ok(Ok(stream)) => { // Successfully connected; close and return this endpoint drop(stream); return Ok((host.clone(), *port)); } _ => { // Try next } } } Err(anyhow!("No reachable MQTT endpoint from config")) } pub async fn wait_for_serial(client: &AsyncClient, eventloop: &mut EventLoop) -> Result { // Subscribe to N/+/system/0/Serial client .subscribe("N/+/system/0/Serial", QoS::AtLeastOnce) .await?; loop { match eventloop.poll().await? { Event::Incoming(Incoming::Publish(p)) => { let topic = p.topic.clone(); if let Some(serial) = extract_serial_from_topic(&topic) { info!("Detected serial from topic: {}", serial); return Ok(serial); } } _ => { // Ignore other events } } } } /// Read values and update DBus properties immediately as each message arrives. /// Returns `Ok(true)` if all topics were received, `Ok(false)` on timeout with partial data. pub async fn read_values_and_update_properties_immediately( client: &AsyncClient, eventloop: &mut EventLoop, serial: &str, conn: &Connection, dbus_state: &SharedState, ) -> Result { let mappings = MqttPropertyMappings::new(serial); let topics = mappings.get_subscription_topics(); // Subscribe to all topics for topic in &topics { client.subscribe(*topic, QoS::AtLeastOnce).await?; } debug!( "Subscribed to {} topics for serial={}", topics.len(), serial ); let mut received_topics = std::collections::HashSet::new(); let total_topics = topics.len(); // We'll wait up to 30 seconds for all values let timeout = time::sleep(Duration::from_secs(30)); tokio::pin!(timeout); let all_received = loop { select! { _ = &mut timeout => { warn!("Timeout while waiting for all values, received {} of {}", received_topics.len(), total_topics); break false; } ev = eventloop.poll() => { match ev? { Event::Incoming(Incoming::Publish(p)) => { let topic = p.topic.clone(); let payload_str = String::from_utf8_lossy(&p.payload).to_string(); if let Some(mapping) = mappings.find_mapping(&topic) { let value = parse_numeric_value(Some(&payload_str)); update_single_property(conn, dbus_state, &mapping.property_type, value).await?; received_topics.insert(topic.clone()); info!("Updated {:?} with value {} ({} of {} topics received)", mapping.property_type, value, received_topics.len(), total_topics); } else if mappings.is_batteries_topic(&topic) && let Some((soc, power, state_str)) = parse_batteries_payload(&payload_str) { update_single_property(conn, dbus_state, &PropertyType::BatterySoc, soc).await?; update_single_property(conn, dbus_state, &PropertyType::BatteryPower, power).await?; update_battery_state_str(conn, dbus_state, &state_str).await?; received_topics.insert(topic.clone()); info!("Updated battery from Batteries topic: soc={}, power={}, state={} ({} of {} topics received)", soc, power, state_str, received_topics.len(), total_topics); } if received_topics.len() >= total_topics { info!("All {} properties updated successfully", total_topics); break true; } } _ => { // Ignore other events (ConnAck, SubAck, etc.) } } } } }; Ok(all_received) } #[cfg(test)] mod tests { use super::*; use tokio::net::TcpListener; #[test] fn test_extract_serial_from_topic() { assert_eq!( extract_serial_from_topic("N/abc123/system/0/Serial"), Some("abc123".to_string()) ); assert_eq!( extract_serial_from_topic("N/xyz/system/0/Serial"), Some("xyz".to_string()) ); assert_eq!( extract_serial_from_topic("N//system/0/Serial"), Some("".to_string()) ); assert_eq!(extract_serial_from_topic("N/abc123/system/0/Other"), None); assert_eq!(extract_serial_from_topic("foo/bar"), None); } #[tokio::test] async fn test_pick_first_available_picks_open_second() { // Start a listener for the second endpoint only let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap(); let endpoints = vec![ ("127.0.0.1".to_string(), 1), // very likely closed (addr.ip().to_string(), addr.port()), ]; let chosen = pick_first_available(&endpoints).await.unwrap(); assert_eq!(chosen, (addr.ip().to_string(), addr.port())); drop(listener); } #[tokio::test] async fn test_pick_first_available_prefers_first_when_both_open() { let listener1 = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr1 = listener1.local_addr().unwrap(); let listener2 = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr2 = listener2.local_addr().unwrap(); let endpoints = vec![ (addr1.ip().to_string(), addr1.port()), (addr2.ip().to_string(), addr2.port()), ]; let chosen = pick_first_available(&endpoints).await.unwrap(); assert_eq!(chosen, (addr1.ip().to_string(), addr1.port())); drop(listener1); drop(listener2); } #[test] fn test_parse_numeric_value_direct() { let value = "123.45".to_string(); let input = Some(&value); assert_eq!(parse_numeric_value(input), 123.45); } #[test] fn test_parse_numeric_value_json() { let value = r#"{"value": 67.89}"#.to_string(); let input = Some(&value); assert_eq!(parse_numeric_value(input), 67.89); } #[test] fn test_parse_numeric_value_json_string() { let value = r#"{"value": "90.12"}"#.to_string(); let input = Some(&value); assert_eq!(parse_numeric_value(input), 90.12); } #[test] fn test_parse_numeric_value_invalid() { let value = "invalid".to_string(); let input = Some(&value); assert_eq!(parse_numeric_value(input), 0.0); } #[test] fn test_parse_numeric_value_none() { let input = None; assert_eq!(parse_numeric_value(input), 0.0); } #[test] fn test_parse_numeric_value_whitespace() { let value = " 42.5 ".to_string(); let input = Some(&value); assert_eq!(parse_numeric_value(input), 42.5); } #[test] fn test_topic_property_mapping() { let mappings = MqttPropertyMappings::new("test123"); let topics = mappings.get_subscription_topics(); assert_eq!(topics.len(), 4); assert!(topics.contains(&"N/test123/solarcharger/277/Yield/Power")); assert!(topics.contains(&"N/test123/system/0/Ac/Grid/L1/Power")); assert!(topics.contains(&"N/test123/system/0/Ac/Consumption/L1/Power")); assert!(topics.contains(&"N/test123/system/0/Batteries")); // Test finding solar mapping let solar_mapping = mappings.find_mapping("N/test123/solarcharger/277/Yield/Power"); assert!(solar_mapping.is_some()); assert_eq!( solar_mapping.unwrap().property_type, PropertyType::SolarPower ); // Test finding AC input mapping let ac_input_mapping = mappings.find_mapping("N/test123/system/0/Ac/Grid/L1/Power"); assert!(ac_input_mapping.is_some()); assert_eq!( ac_input_mapping.unwrap().property_type, PropertyType::AcInputPower ); // Test finding AC load mapping let ac_load_mapping = mappings.find_mapping("N/test123/system/0/Ac/Consumption/L1/Power"); assert!(ac_load_mapping.is_some()); assert_eq!( ac_load_mapping.unwrap().property_type, PropertyType::AcLoadPower ); // Batteries topic is handled specially, not in find_mapping assert!( mappings .find_mapping("N/test123/system/0/Batteries") .is_none() ); assert!(mappings.is_batteries_topic("N/test123/system/0/Batteries")); } // --- parse_numeric_value: fallback chain and design decisions --- #[test] fn test_parse_numeric_value_negative() { let value = "-123.45".to_string(); assert_eq!(parse_numeric_value(Some(&value)), -123.45); } #[test] fn test_parse_numeric_value_json_negative() { let value = r#"{"value": -67.89}"#.to_string(); assert_eq!(parse_numeric_value(Some(&value)), -67.89); } #[test] fn test_parse_numeric_value_json_null() { let value = r#"{"value": null}"#.to_string(); assert_eq!(parse_numeric_value(Some(&value)), 0.0); } #[test] fn test_parse_numeric_value_json_missing_value_key() { let value = r#"{"power": 42}"#.to_string(); assert_eq!(parse_numeric_value(Some(&value)), 0.0); } #[test] fn test_parse_numeric_value_empty_string() { let value = "".to_string(); assert_eq!(parse_numeric_value(Some(&value)), 0.0); } #[test] fn test_parse_numeric_value_json_with_extra_fields() { let value = r#"{"value": 42, "unit": "W"}"#.to_string(); assert_eq!(parse_numeric_value(Some(&value)), 42.0); } #[test] fn test_parse_numeric_value_json_non_numeric_string() { let value = r#"{"value": "n/a"}"#.to_string(); assert_eq!(parse_numeric_value(Some(&value)), 0.0); } // --- extract_serial_from_topic: boundary cases --- #[test] fn test_extract_serial_empty_topic() { assert_eq!(extract_serial_from_topic(""), None); } #[test] fn test_extract_serial_data_topic_rejected() { // Data topics must not be mistaken for serial discovery topics assert_eq!( extract_serial_from_topic("N/abc/battery/278/Dc/0/Power"), None ); } // --- MqttPropertyMappings: Victron device mapping design --- #[test] fn test_mapping_find_returns_none_for_unknown_topic() { let mappings = MqttPropertyMappings::new("serial1"); assert!(mappings.find_mapping("N/serial1/unknown/0/Foo").is_none()); } #[test] fn test_mapping_solar_power_exists() { let mappings = MqttPropertyMappings::new("s1"); let solar = mappings .find_mapping("N/s1/solarcharger/277/Yield/Power") .expect("solar mapping should exist"); assert_eq!(solar.property_type, PropertyType::SolarPower); } #[test] fn test_mapping_different_serials_dont_cross_match() { let mappings_a = MqttPropertyMappings::new("AAA"); let mappings_b = MqttPropertyMappings::new("BBB"); // Simple mappings from serial AAA must not match in serial BBB for topic in mappings_a.get_subscription_topics() { assert!( mappings_b.find_mapping(topic).is_none(), "serial BBB should not match topic from serial AAA: {topic}" ); if mappings_a.is_batteries_topic(topic) { assert!( !mappings_b.is_batteries_topic(topic), "serial BBB batteries topic should not match serial AAA: {topic}" ); } } } #[test] fn test_parse_batteries_payload_charging() { let payload = r#"{"value":[{"active_battery_service":true,"current":1.8,"id":"com.victronenergy.battery.ttyS7","instance":279,"name":"Shunt","power":48.13,"soc":67.1,"state":1,"voltage":26.74}]}"#; let (soc, power, state) = parse_batteries_payload(payload).unwrap(); assert!((soc - 67.1).abs() < 0.01); assert!((power - 48.13).abs() < 0.01); assert_eq!(state, "charging"); } #[test] fn test_parse_batteries_payload_discharging() { let payload = r#"{"value":[{"soc":50.0,"power":-334.78,"state":2}]}"#; let (soc, power, state) = parse_batteries_payload(payload).unwrap(); assert!((soc - 50.0).abs() < 0.01); assert!((power - (-334.78)).abs() < 0.01); assert_eq!(state, "discharging"); } #[test] fn test_parse_batteries_payload_idle() { let payload = r#"{"value":[{"soc":100.0,"power":0.0,"state":0}]}"#; let (soc, _power, state) = parse_batteries_payload(payload).unwrap(); assert!((soc - 100.0).abs() < 0.01); assert_eq!(state, "idle"); } #[test] fn test_parse_batteries_payload_invalid() { assert!(parse_batteries_payload("not json").is_none()); assert!(parse_batteries_payload(r#"{"value":[]}"#).is_none()); assert!(parse_batteries_payload(r#"{"value":"nope"}"#).is_none()); } #[test] fn test_mapping_topics_embed_serial() { let serial = "mySerial42"; let mappings = MqttPropertyMappings::new(serial); for topic in mappings.get_subscription_topics() { assert!( topic.contains(serial), "topic should contain serial: {topic}" ); } } // --- build_mqtt_options: configuration policy --- #[test] fn test_build_mqtt_options_applies_credentials_when_present() { let cfg = Config { endpoints: vec![("127.0.0.1".to_string(), 1883)], username: Some("user".to_string()), password: Some("pass".to_string()), client_id: "test-client".to_string(), refresh_interval_seconds: 60, }; let opts = build_mqtt_options(&cfg, "127.0.0.1", 1883); let login = opts.credentials().expect("credentials should be set"); assert_eq!(login.username, "user"); assert_eq!(login.password, "pass"); assert_eq!(opts.keep_alive(), Duration::from_secs(10)); } #[test] fn test_build_mqtt_options_skips_credentials_when_absent() { let cfg = Config { endpoints: vec![("127.0.0.1".to_string(), 1883)], username: None, password: None, client_id: "anon-client".to_string(), refresh_interval_seconds: 60, }; let opts = build_mqtt_options(&cfg, "127.0.0.1", 1883); assert!( opts.credentials().is_none(), "anonymous mode should have no credentials" ); assert_eq!(opts.client_id(), "anon-client"); assert_eq!(opts.keep_alive(), Duration::from_secs(10)); } }