summaryrefslogtreecommitdiff
path: root/service/src/main.rs
diff options
context:
space:
mode:
authorDawid Rycerz <dawid@rycerz.xyz>2026-02-07 17:29:48 +0100
committerDawid Rycerz <dawid@rycerz.xyz>2026-02-07 17:29:48 +0100
commit2eda97537b63d68b2e9ba06500e3fb491894d10c (patch)
tree52873ad380cd97f4327765aac24659a2b00079b1 /service/src/main.rs
feat: camper van energy monitoring widget for Plasma 6main
Pure QML KPackage widget with Rust background service for real-time Victron energy system monitoring via MQTT and D-Bus. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat (limited to 'service/src/main.rs')
-rw-r--r--service/src/main.rs238
1 files changed, 238 insertions, 0 deletions
diff --git a/service/src/main.rs b/service/src/main.rs
new file mode 100644
index 0000000..51f7ff7
--- /dev/null
+++ b/service/src/main.rs
@@ -0,0 +1,238 @@
+use std::sync::Arc;
+use std::time::Duration;
+
+use anyhow::Result;
+use camper_widget_refresh::cache::{read_serial_cache, write_serial_cache};
+use camper_widget_refresh::config::{Config, listen_for_config_changes, load_config};
+use camper_widget_refresh::dbus::{self, start_service, update_connected_status};
+use camper_widget_refresh::victron_mqtt::{
+ build_mqtt_options, disconnect_mqtt, pick_first_available,
+ read_values_and_update_properties_immediately, wait_for_serial,
+};
+use rumqttc::AsyncClient;
+use tokio::sync::RwLock;
+use tokio::task::JoinHandle;
+use tokio::time;
+use tokio_util::sync::CancellationToken;
+use tracing::{error, info, warn};
+
+const MAX_CONFIG_LISTENER_BACKOFF_SECS: u64 = 30;
+
+async fn refresh_data(
+ cfg: &Config,
+ conn: &zbus::Connection,
+ dbus_state: &dbus::SharedState,
+) -> Result<()> {
+ let (host, port) = pick_first_available(&cfg.endpoints).await?;
+ info!("Connecting to MQTT {}:{} as {}", host, port, cfg.client_id);
+
+ // Set connected to true when we found a reachable host
+ update_connected_status(conn, dbus_state, true).await?;
+
+ let mqttoptions = build_mqtt_options(cfg, &host, port);
+ let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
+
+ // Try cache for serial first
+ let mut serial: Option<String> = match read_serial_cache().await {
+ Ok(Some(s)) => {
+ info!("Using cached serial: {}", s);
+ Some(s)
+ }
+ Ok(None) => None,
+ Err(e) => {
+ warn!("Failed reading cache: {}", e);
+ None
+ }
+ };
+
+ if serial.is_none() {
+ info!("Waiting for serial on N/+/system/0/Serial...");
+ let s = wait_for_serial(&client, &mut eventloop).await?;
+ // Long TTL 30 days
+ if let Err(e) = write_serial_cache(&s, Duration::from_secs(30 * 24 * 3600)).await {
+ warn!("Failed to write cache: {}", e);
+ }
+ serial = Some(s);
+ }
+
+ let serial = serial.expect("serial should be set");
+
+ // Read values and update DBus properties immediately as each MQTT message arrives
+ let result = read_values_and_update_properties_immediately(
+ &client,
+ &mut eventloop,
+ &serial,
+ conn,
+ dbus_state,
+ )
+ .await;
+
+ disconnect_mqtt(&client).await;
+
+ match result? {
+ true => {
+ info!("Completed refresh cycle - all properties updated");
+ }
+ false => {
+ warn!("Refresh cycle completed with partial data - some MQTT topics timed out");
+ }
+ }
+
+ Ok(())
+}
+
+fn spawn_config_listener(
+ conn: zbus::Connection,
+ config_state: Arc<RwLock<Config>>,
+ shared_state: dbus::SharedState,
+ token: CancellationToken,
+) -> JoinHandle<()> {
+ tokio::spawn(async move {
+ if let Err(e) = listen_for_config_changes(&conn, config_state, &shared_state, token).await {
+ error!("Config change listener failed: {}", e);
+ }
+ })
+}
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ // Initialize tracing early so debug logs from config loading are visible
+ let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
+ .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
+ let _ = tracing::subscriber::set_global_default(
+ tracing_subscriber::fmt()
+ .with_env_filter(env_filter)
+ .with_target(false)
+ .finish(),
+ );
+
+ let initial_cfg = load_config().await;
+
+ // Start D-Bus service and keep connection/state alive
+ let (dbus_conn, dbus_state) = start_service().await?;
+
+ // Create shared config state
+ let config_state = Arc::new(RwLock::new(initial_cfg));
+
+ // Graceful shutdown via CancellationToken
+ let shutdown_token = CancellationToken::new();
+ let ctrl_c_token = shutdown_token.clone();
+ tokio::spawn(async move {
+ if let Err(e) = tokio::signal::ctrl_c().await {
+ error!("Failed to listen for ctrl-c: {}", e);
+ return;
+ }
+ info!("Received shutdown signal, initiating graceful shutdown");
+ ctrl_c_token.cancel();
+ });
+
+ info!(
+ "Starting camper-widget-refresh with refresh interval: {} seconds",
+ config_state.read().await.refresh_interval_seconds
+ );
+
+ // Spawn the config change listener with supervision
+ let mut config_listener_handle = spawn_config_listener(
+ dbus_conn.clone(),
+ config_state.clone(),
+ dbus_state.clone(),
+ shutdown_token.child_token(),
+ );
+ let mut config_backoff_secs: u64 = 1;
+
+ // Run initial refresh
+ {
+ let cfg = config_state.read().await;
+ if let Err(e) = refresh_data(&cfg, &dbus_conn, &dbus_state).await {
+ warn!("Initial refresh failed: {}", e);
+ // Set connected to false when refresh fails
+ if let Err(update_err) = update_connected_status(&dbus_conn, &dbus_state, false).await {
+ warn!("Failed to update connected status: {}", update_err);
+ }
+ }
+ }
+
+ // Run refresh loop with dynamic interval
+ while !shutdown_token.is_cancelled() {
+ // Supervise config listener — respawn with backoff if it died
+ if config_listener_handle.is_finished() && !shutdown_token.is_cancelled() {
+ error!(
+ "Config listener exited unexpectedly, respawning in {}s",
+ config_backoff_secs
+ );
+ time::sleep(Duration::from_secs(config_backoff_secs)).await;
+ config_backoff_secs = (config_backoff_secs * 2).min(MAX_CONFIG_LISTENER_BACKOFF_SECS);
+ config_listener_handle = spawn_config_listener(
+ dbus_conn.clone(),
+ config_state.clone(),
+ dbus_state.clone(),
+ shutdown_token.child_token(),
+ );
+ }
+
+ let refresh_interval = {
+ let cfg = config_state.read().await;
+ Duration::from_secs(cfg.refresh_interval_seconds)
+ };
+
+ // Race sleep against shutdown
+ tokio::select! {
+ _ = time::sleep(refresh_interval) => {}
+ _ = shutdown_token.cancelled() => break,
+ }
+
+ let cfg = config_state.read().await;
+ match refresh_data(&cfg, &dbus_conn, &dbus_state).await {
+ Ok(()) => {
+ // Successful refresh — reset config listener backoff
+ config_backoff_secs = 1;
+ }
+ Err(e) => {
+ warn!("Refresh failed: {}", e);
+ if let Err(update_err) =
+ update_connected_status(&dbus_conn, &dbus_state, false).await
+ {
+ warn!("Failed to update connected status: {}", update_err);
+ }
+ }
+ }
+ }
+
+ info!("Shutting down cleanly");
+ Ok(())
+}
+
+#[cfg(test)]
+mod tests {
+
+ // parse_numeric_value tests moved to victron_mqtt.rs
+
+ #[test]
+ fn test_battery_direction_mapping() {
+ // Test the direction mapping logic
+ let test_cases = vec![
+ (10.0, "charging"),
+ (-10.0, "discharging"),
+ (0.0, "idle"),
+ (3.0, "idle"), // Below threshold
+ (-3.0, "idle"), // Above negative threshold
+ (5.1, "charging"), // Just above threshold
+ (-5.1, "discharging"), // Just below negative threshold
+ ];
+
+ for (power, expected) in test_cases {
+ let direction = if power > 5.0 {
+ "charging".to_string()
+ } else if power < -5.0 {
+ "discharging".to_string()
+ } else {
+ "idle".to_string()
+ };
+
+ assert_eq!(
+ direction, expected,
+ "Power {power} should map to {expected}"
+ );
+ }
+ }
+}