summaryrefslogtreecommitdiff
path: root/service/src/main.rs
blob: 51f7ff7cc67d0ed0d60d7c08865ca2da2e0e3730 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use camper_widget_refresh::cache::{read_serial_cache, write_serial_cache};
use camper_widget_refresh::config::{Config, listen_for_config_changes, load_config};
use camper_widget_refresh::dbus::{self, start_service, update_connected_status};
use camper_widget_refresh::victron_mqtt::{
    build_mqtt_options, disconnect_mqtt, pick_first_available,
    read_values_and_update_properties_immediately, wait_for_serial,
};
use rumqttc::AsyncClient;
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use tokio::time;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};

const MAX_CONFIG_LISTENER_BACKOFF_SECS: u64 = 30;

async fn refresh_data(
    cfg: &Config,
    conn: &zbus::Connection,
    dbus_state: &dbus::SharedState,
) -> Result<()> {
    let (host, port) = pick_first_available(&cfg.endpoints).await?;
    info!("Connecting to MQTT {}:{} as {}", host, port, cfg.client_id);

    // Set connected to true when we found a reachable host
    update_connected_status(conn, dbus_state, true).await?;

    let mqttoptions = build_mqtt_options(cfg, &host, port);
    let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);

    // Try cache for serial first
    let mut serial: Option<String> = match read_serial_cache().await {
        Ok(Some(s)) => {
            info!("Using cached serial: {}", s);
            Some(s)
        }
        Ok(None) => None,
        Err(e) => {
            warn!("Failed reading cache: {}", e);
            None
        }
    };

    if serial.is_none() {
        info!("Waiting for serial on N/+/system/0/Serial...");
        let s = wait_for_serial(&client, &mut eventloop).await?;
        // Long TTL 30 days
        if let Err(e) = write_serial_cache(&s, Duration::from_secs(30 * 24 * 3600)).await {
            warn!("Failed to write cache: {}", e);
        }
        serial = Some(s);
    }

    let serial = serial.expect("serial should be set");

    // Read values and update DBus properties immediately as each MQTT message arrives
    let result = read_values_and_update_properties_immediately(
        &client,
        &mut eventloop,
        &serial,
        conn,
        dbus_state,
    )
    .await;

    disconnect_mqtt(&client).await;

    match result? {
        true => {
            info!("Completed refresh cycle - all properties updated");
        }
        false => {
            warn!("Refresh cycle completed with partial data - some MQTT topics timed out");
        }
    }

    Ok(())
}

fn spawn_config_listener(
    conn: zbus::Connection,
    config_state: Arc<RwLock<Config>>,
    shared_state: dbus::SharedState,
    token: CancellationToken,
) -> JoinHandle<()> {
    tokio::spawn(async move {
        if let Err(e) = listen_for_config_changes(&conn, config_state, &shared_state, token).await {
            error!("Config change listener failed: {}", e);
        }
    })
}

#[tokio::main]
async fn main() -> Result<()> {
    // Initialize tracing early so debug logs from config loading are visible
    let env_filter = tracing_subscriber::EnvFilter::try_from_default_env()
        .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
    let _ = tracing::subscriber::set_global_default(
        tracing_subscriber::fmt()
            .with_env_filter(env_filter)
            .with_target(false)
            .finish(),
    );

    let initial_cfg = load_config().await;

    // Start D-Bus service and keep connection/state alive
    let (dbus_conn, dbus_state) = start_service().await?;

    // Create shared config state
    let config_state = Arc::new(RwLock::new(initial_cfg));

    // Graceful shutdown via CancellationToken
    let shutdown_token = CancellationToken::new();
    let ctrl_c_token = shutdown_token.clone();
    tokio::spawn(async move {
        if let Err(e) = tokio::signal::ctrl_c().await {
            error!("Failed to listen for ctrl-c: {}", e);
            return;
        }
        info!("Received shutdown signal, initiating graceful shutdown");
        ctrl_c_token.cancel();
    });

    info!(
        "Starting camper-widget-refresh with refresh interval: {} seconds",
        config_state.read().await.refresh_interval_seconds
    );

    // Spawn the config change listener with supervision
    let mut config_listener_handle = spawn_config_listener(
        dbus_conn.clone(),
        config_state.clone(),
        dbus_state.clone(),
        shutdown_token.child_token(),
    );
    let mut config_backoff_secs: u64 = 1;

    // Run initial refresh
    {
        let cfg = config_state.read().await;
        if let Err(e) = refresh_data(&cfg, &dbus_conn, &dbus_state).await {
            warn!("Initial refresh failed: {}", e);
            // Set connected to false when refresh fails
            if let Err(update_err) = update_connected_status(&dbus_conn, &dbus_state, false).await {
                warn!("Failed to update connected status: {}", update_err);
            }
        }
    }

    // Run refresh loop with dynamic interval
    while !shutdown_token.is_cancelled() {
        // Supervise config listener — respawn with backoff if it died
        if config_listener_handle.is_finished() && !shutdown_token.is_cancelled() {
            error!(
                "Config listener exited unexpectedly, respawning in {}s",
                config_backoff_secs
            );
            time::sleep(Duration::from_secs(config_backoff_secs)).await;
            config_backoff_secs = (config_backoff_secs * 2).min(MAX_CONFIG_LISTENER_BACKOFF_SECS);
            config_listener_handle = spawn_config_listener(
                dbus_conn.clone(),
                config_state.clone(),
                dbus_state.clone(),
                shutdown_token.child_token(),
            );
        }

        let refresh_interval = {
            let cfg = config_state.read().await;
            Duration::from_secs(cfg.refresh_interval_seconds)
        };

        // Race sleep against shutdown
        tokio::select! {
            _ = time::sleep(refresh_interval) => {}
            _ = shutdown_token.cancelled() => break,
        }

        let cfg = config_state.read().await;
        match refresh_data(&cfg, &dbus_conn, &dbus_state).await {
            Ok(()) => {
                // Successful refresh — reset config listener backoff
                config_backoff_secs = 1;
            }
            Err(e) => {
                warn!("Refresh failed: {}", e);
                if let Err(update_err) =
                    update_connected_status(&dbus_conn, &dbus_state, false).await
                {
                    warn!("Failed to update connected status: {}", update_err);
                }
            }
        }
    }

    info!("Shutting down cleanly");
    Ok(())
}

#[cfg(test)]
mod tests {

    // parse_numeric_value tests moved to victron_mqtt.rs

    #[test]
    fn test_battery_direction_mapping() {
        // Test the direction mapping logic
        let test_cases = vec![
            (10.0, "charging"),
            (-10.0, "discharging"),
            (0.0, "idle"),
            (3.0, "idle"),         // Below threshold
            (-3.0, "idle"),        // Above negative threshold
            (5.1, "charging"),     // Just above threshold
            (-5.1, "discharging"), // Just below negative threshold
        ];

        for (power, expected) in test_cases {
            let direction = if power > 5.0 {
                "charging".to_string()
            } else if power < -5.0 {
                "discharging".to_string()
            } else {
                "idle".to_string()
            };

            assert_eq!(
                direction, expected,
                "Power {power} should map to {expected}"
            );
        }
    }
}