diff options
| author | Dawid Rycerz <dawid@rycerz.xyz> | 2026-02-07 17:29:48 +0100 |
|---|---|---|
| committer | Dawid Rycerz <dawid@rycerz.xyz> | 2026-02-07 17:29:48 +0100 |
| commit | 2eda97537b63d68b2e9ba06500e3fb491894d10c (patch) | |
| tree | 52873ad380cd97f4327765aac24659a2b00079b1 /service/src/main.rs | |
feat: camper van energy monitoring widget for Plasma 6main
Pure QML KPackage widget with Rust background service for real-time
Victron energy system monitoring via MQTT and D-Bus.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat (limited to 'service/src/main.rs')
| -rw-r--r-- | service/src/main.rs | 238 |
1 files changed, 238 insertions, 0 deletions
diff --git a/service/src/main.rs b/service/src/main.rs new file mode 100644 index 0000000..51f7ff7 --- /dev/null +++ b/service/src/main.rs @@ -0,0 +1,238 @@ +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use camper_widget_refresh::cache::{read_serial_cache, write_serial_cache}; +use camper_widget_refresh::config::{Config, listen_for_config_changes, load_config}; +use camper_widget_refresh::dbus::{self, start_service, update_connected_status}; +use camper_widget_refresh::victron_mqtt::{ + build_mqtt_options, disconnect_mqtt, pick_first_available, + read_values_and_update_properties_immediately, wait_for_serial, +}; +use rumqttc::AsyncClient; +use tokio::sync::RwLock; +use tokio::task::JoinHandle; +use tokio::time; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; + +const MAX_CONFIG_LISTENER_BACKOFF_SECS: u64 = 30; + +async fn refresh_data( + cfg: &Config, + conn: &zbus::Connection, + dbus_state: &dbus::SharedState, +) -> Result<()> { + let (host, port) = pick_first_available(&cfg.endpoints).await?; + info!("Connecting to MQTT {}:{} as {}", host, port, cfg.client_id); + + // Set connected to true when we found a reachable host + update_connected_status(conn, dbus_state, true).await?; + + let mqttoptions = build_mqtt_options(cfg, &host, port); + let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10); + + // Try cache for serial first + let mut serial: Option<String> = match read_serial_cache().await { + Ok(Some(s)) => { + info!("Using cached serial: {}", s); + Some(s) + } + Ok(None) => None, + Err(e) => { + warn!("Failed reading cache: {}", e); + None + } + }; + + if serial.is_none() { + info!("Waiting for serial on N/+/system/0/Serial..."); + let s = wait_for_serial(&client, &mut eventloop).await?; + // Long TTL 30 days + if let Err(e) = write_serial_cache(&s, Duration::from_secs(30 * 24 * 3600)).await { + warn!("Failed to write cache: {}", e); + } + serial = Some(s); + } + + let serial = serial.expect("serial should be set"); + + // Read values and update DBus properties immediately as each MQTT message arrives + let result = read_values_and_update_properties_immediately( + &client, + &mut eventloop, + &serial, + conn, + dbus_state, + ) + .await; + + disconnect_mqtt(&client).await; + + match result? { + true => { + info!("Completed refresh cycle - all properties updated"); + } + false => { + warn!("Refresh cycle completed with partial data - some MQTT topics timed out"); + } + } + + Ok(()) +} + +fn spawn_config_listener( + conn: zbus::Connection, + config_state: Arc<RwLock<Config>>, + shared_state: dbus::SharedState, + token: CancellationToken, +) -> JoinHandle<()> { + tokio::spawn(async move { + if let Err(e) = listen_for_config_changes(&conn, config_state, &shared_state, token).await { + error!("Config change listener failed: {}", e); + } + }) +} + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize tracing early so debug logs from config loading are visible + let env_filter = tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")); + let _ = tracing::subscriber::set_global_default( + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .with_target(false) + .finish(), + ); + + let initial_cfg = load_config().await; + + // Start D-Bus service and keep connection/state alive + let (dbus_conn, dbus_state) = start_service().await?; + + // Create shared config state + let config_state = Arc::new(RwLock::new(initial_cfg)); + + // Graceful shutdown via CancellationToken + let shutdown_token = CancellationToken::new(); + let ctrl_c_token = shutdown_token.clone(); + tokio::spawn(async move { + if let Err(e) = tokio::signal::ctrl_c().await { + error!("Failed to listen for ctrl-c: {}", e); + return; + } + info!("Received shutdown signal, initiating graceful shutdown"); + ctrl_c_token.cancel(); + }); + + info!( + "Starting camper-widget-refresh with refresh interval: {} seconds", + config_state.read().await.refresh_interval_seconds + ); + + // Spawn the config change listener with supervision + let mut config_listener_handle = spawn_config_listener( + dbus_conn.clone(), + config_state.clone(), + dbus_state.clone(), + shutdown_token.child_token(), + ); + let mut config_backoff_secs: u64 = 1; + + // Run initial refresh + { + let cfg = config_state.read().await; + if let Err(e) = refresh_data(&cfg, &dbus_conn, &dbus_state).await { + warn!("Initial refresh failed: {}", e); + // Set connected to false when refresh fails + if let Err(update_err) = update_connected_status(&dbus_conn, &dbus_state, false).await { + warn!("Failed to update connected status: {}", update_err); + } + } + } + + // Run refresh loop with dynamic interval + while !shutdown_token.is_cancelled() { + // Supervise config listener — respawn with backoff if it died + if config_listener_handle.is_finished() && !shutdown_token.is_cancelled() { + error!( + "Config listener exited unexpectedly, respawning in {}s", + config_backoff_secs + ); + time::sleep(Duration::from_secs(config_backoff_secs)).await; + config_backoff_secs = (config_backoff_secs * 2).min(MAX_CONFIG_LISTENER_BACKOFF_SECS); + config_listener_handle = spawn_config_listener( + dbus_conn.clone(), + config_state.clone(), + dbus_state.clone(), + shutdown_token.child_token(), + ); + } + + let refresh_interval = { + let cfg = config_state.read().await; + Duration::from_secs(cfg.refresh_interval_seconds) + }; + + // Race sleep against shutdown + tokio::select! { + _ = time::sleep(refresh_interval) => {} + _ = shutdown_token.cancelled() => break, + } + + let cfg = config_state.read().await; + match refresh_data(&cfg, &dbus_conn, &dbus_state).await { + Ok(()) => { + // Successful refresh — reset config listener backoff + config_backoff_secs = 1; + } + Err(e) => { + warn!("Refresh failed: {}", e); + if let Err(update_err) = + update_connected_status(&dbus_conn, &dbus_state, false).await + { + warn!("Failed to update connected status: {}", update_err); + } + } + } + } + + info!("Shutting down cleanly"); + Ok(()) +} + +#[cfg(test)] +mod tests { + + // parse_numeric_value tests moved to victron_mqtt.rs + + #[test] + fn test_battery_direction_mapping() { + // Test the direction mapping logic + let test_cases = vec![ + (10.0, "charging"), + (-10.0, "discharging"), + (0.0, "idle"), + (3.0, "idle"), // Below threshold + (-3.0, "idle"), // Above negative threshold + (5.1, "charging"), // Just above threshold + (-5.1, "discharging"), // Just below negative threshold + ]; + + for (power, expected) in test_cases { + let direction = if power > 5.0 { + "charging".to_string() + } else if power < -5.0 { + "discharging".to_string() + } else { + "idle".to_string() + }; + + assert_eq!( + direction, expected, + "Power {power} should map to {expected}" + ); + } + } +} |
