use std::sync::Arc; use std::time::Duration; use anyhow::Result; use camper_widget_refresh::cache::{read_serial_cache, write_serial_cache}; use camper_widget_refresh::config::{Config, listen_for_config_changes, load_config}; use camper_widget_refresh::dbus::{self, start_service, update_connected_status}; use camper_widget_refresh::victron_mqtt::{ build_mqtt_options, disconnect_mqtt, pick_first_available, read_values_and_update_properties_immediately, wait_for_serial, }; use rumqttc::AsyncClient; use tokio::sync::RwLock; use tokio::task::JoinHandle; use tokio::time; use tokio_util::sync::CancellationToken; use tracing::{error, info, warn}; const MAX_CONFIG_LISTENER_BACKOFF_SECS: u64 = 30; async fn refresh_data( cfg: &Config, conn: &zbus::Connection, dbus_state: &dbus::SharedState, ) -> Result<()> { let (host, port) = pick_first_available(&cfg.endpoints).await?; info!("Connecting to MQTT {}:{} as {}", host, port, cfg.client_id); // Set connected to true when we found a reachable host update_connected_status(conn, dbus_state, true).await?; let mqttoptions = build_mqtt_options(cfg, &host, port); let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); // Try cache for serial first let mut serial: Option = match read_serial_cache().await { Ok(Some(s)) => { info!("Using cached serial: {}", s); Some(s) } Ok(None) => None, Err(e) => { warn!("Failed reading cache: {}", e); None } }; if serial.is_none() { info!("Waiting for serial on N/+/system/0/Serial..."); let s = wait_for_serial(&client, &mut eventloop).await?; // Long TTL 30 days if let Err(e) = write_serial_cache(&s, Duration::from_secs(30 * 24 * 3600)).await { warn!("Failed to write cache: {}", e); } serial = Some(s); } let serial = serial.expect("serial should be set"); // Read values and update DBus properties immediately as each MQTT message arrives let result = read_values_and_update_properties_immediately( &client, &mut eventloop, &serial, conn, dbus_state, ) .await; disconnect_mqtt(&client).await; match result? { true => { info!("Completed refresh cycle - all properties updated"); } false => { warn!("Refresh cycle completed with partial data - some MQTT topics timed out"); } } Ok(()) } fn spawn_config_listener( conn: zbus::Connection, config_state: Arc>, shared_state: dbus::SharedState, token: CancellationToken, ) -> JoinHandle<()> { tokio::spawn(async move { if let Err(e) = listen_for_config_changes(&conn, config_state, &shared_state, token).await { error!("Config change listener failed: {}", e); } }) } #[tokio::main] async fn main() -> Result<()> { // Initialize tracing early so debug logs from config loading are visible let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); let _ = tracing::subscriber::set_global_default( tracing_subscriber::fmt() .with_env_filter(env_filter) .with_target(false) .finish(), ); let initial_cfg = load_config().await; // Start D-Bus service and keep connection/state alive let (dbus_conn, dbus_state) = start_service().await?; // Create shared config state let config_state = Arc::new(RwLock::new(initial_cfg)); // Graceful shutdown via CancellationToken let shutdown_token = CancellationToken::new(); let ctrl_c_token = shutdown_token.clone(); tokio::spawn(async move { if let Err(e) = tokio::signal::ctrl_c().await { error!("Failed to listen for ctrl-c: {}", e); return; } info!("Received shutdown signal, initiating graceful shutdown"); ctrl_c_token.cancel(); }); info!( "Starting camper-widget-refresh with refresh interval: {} seconds", config_state.read().await.refresh_interval_seconds ); // Spawn the config change listener with supervision let mut config_listener_handle = spawn_config_listener( dbus_conn.clone(), config_state.clone(), dbus_state.clone(), shutdown_token.child_token(), ); let mut config_backoff_secs: u64 = 1; // Run initial refresh { let cfg = config_state.read().await; if let Err(e) = refresh_data(&cfg, &dbus_conn, &dbus_state).await { warn!("Initial refresh failed: {}", e); // Set connected to false when refresh fails if let Err(update_err) = update_connected_status(&dbus_conn, &dbus_state, false).await { warn!("Failed to update connected status: {}", update_err); } } } // Run refresh loop with dynamic interval while !shutdown_token.is_cancelled() { // Supervise config listener — respawn with backoff if it died if config_listener_handle.is_finished() && !shutdown_token.is_cancelled() { error!( "Config listener exited unexpectedly, respawning in {}s", config_backoff_secs ); time::sleep(Duration::from_secs(config_backoff_secs)).await; config_backoff_secs = (config_backoff_secs * 2).min(MAX_CONFIG_LISTENER_BACKOFF_SECS); config_listener_handle = spawn_config_listener( dbus_conn.clone(), config_state.clone(), dbus_state.clone(), shutdown_token.child_token(), ); } let refresh_interval = { let cfg = config_state.read().await; Duration::from_secs(cfg.refresh_interval_seconds) }; // Race sleep against shutdown tokio::select! { _ = time::sleep(refresh_interval) => {} _ = shutdown_token.cancelled() => break, } let cfg = config_state.read().await; match refresh_data(&cfg, &dbus_conn, &dbus_state).await { Ok(()) => { // Successful refresh — reset config listener backoff config_backoff_secs = 1; } Err(e) => { warn!("Refresh failed: {}", e); if let Err(update_err) = update_connected_status(&dbus_conn, &dbus_state, false).await { warn!("Failed to update connected status: {}", update_err); } } } } info!("Shutting down cleanly"); Ok(()) } #[cfg(test)] mod tests { // parse_numeric_value tests moved to victron_mqtt.rs #[test] fn test_battery_direction_mapping() { // Test the direction mapping logic let test_cases = vec![ (10.0, "charging"), (-10.0, "discharging"), (0.0, "idle"), (3.0, "idle"), // Below threshold (-3.0, "idle"), // Above negative threshold (5.1, "charging"), // Just above threshold (-5.1, "discharging"), // Just below negative threshold ]; for (power, expected) in test_cases { let direction = if power > 5.0 { "charging".to_string() } else if power < -5.0 { "discharging".to_string() } else { "idle".to_string() }; assert_eq!( direction, expected, "Power {power} should map to {expected}" ); } } }