summaryrefslogtreecommitdiff
path: root/src/weather_poller.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/weather_poller.rs')
-rw-r--r--src/weather_poller.rs503
1 files changed, 502 insertions, 1 deletions
diff --git a/src/weather_poller.rs b/src/weather_poller.rs
index ea211b7..76f3fda 100644
--- a/src/weather_poller.rs
+++ b/src/weather_poller.rs
@@ -1 +1,502 @@
-// Remove all unused imports at the top of the file.
+use crate::{
+ notifications::NtfySettingsRepository,
+ users::{UserRepository, UserWithLocation},
+ weather_api_data::WeatherApiDataRepository,
+ weather_thresholds::WeatherThresholdRepository,
+};
+use anyhow::Result;
+use chrono::{DateTime, Utc};
+use reqwest::Client;
+use serde_json::Value;
+use sqlx::SqlitePool;
+use std::env;
+use std::sync::Arc;
+use tokio::time::{Duration, sleep};
+use tracing::{error, info, warn};
+
+const API_URL: &str = "https://api.openweathermap.org/data/2.5/forecast";
+const ALERT_WINDOW_HOURS: i64 = 24;
+
+pub struct WeatherPoller {
+ pool: Arc<SqlitePool>,
+ client: Client,
+}
+
+impl WeatherPoller {
+ pub fn new(pool: Arc<SqlitePool>) -> Self {
+ Self {
+ pool,
+ client: Client::new(),
+ }
+ }
+
+ /// Main function to check weather for all users with locations
+ pub async fn check_all(&self) -> Result<()> {
+ info!("🔄 Checking weather forecast for all users...");
+
+ let user_repo = UserRepository { db: &self.pool };
+ let users_with_locations = user_repo.list_users_with_locations().await?;
+
+ if users_with_locations.is_empty() {
+ info!("No users with locations found");
+ return Ok(());
+ }
+
+ info!("Found {} users with locations", users_with_locations.len());
+
+ // Spawn async tasks for each user-location combination
+ let mut handles = Vec::new();
+ for user_with_location in users_with_locations {
+ let poller = self.clone();
+ let handle =
+ tokio::spawn(async move { poller.check_user_weather(user_with_location).await });
+ handles.push(handle);
+ }
+
+ // Wait for all tasks to complete
+ for handle in handles {
+ if let Err(e) = handle.await {
+ error!("Task failed: {}", e);
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Check weather for a specific user and location
+ async fn check_user_weather(&self, user_with_location: UserWithLocation) -> Result<()> {
+ let user_id = user_with_location.id;
+ let location_id = user_with_location.location_id;
+ let lat = user_with_location.latitude;
+ let lon = user_with_location.longitude;
+
+ info!(
+ "Checking weather for user {} at location {}",
+ user_id, location_id
+ );
+
+ // Round coordinates (similar to Elixir project)
+ let (rounded_lat, rounded_lon) = self.round_coordinates(lat, lon);
+
+ match self.fetch_forecast(rounded_lat, rounded_lon).await {
+ Ok(forecast_data) => {
+ // Store the weather data
+ let api_data_repo = WeatherApiDataRepository { db: &self.pool };
+ let json_data = serde_json::to_string(&forecast_data)?;
+
+ api_data_repo
+ .create_weather_data(
+ user_id,
+ location_id,
+ "openweathermap".to_string(),
+ json_data,
+ )
+ .await?;
+
+ info!(
+ "Weather data stored for user {} location {}",
+ user_id, location_id
+ );
+
+ // Check thresholds after storing data
+ self.check_thresholds(user_id, location_id, &forecast_data)
+ .await?;
+ }
+ Err(e) => {
+ error!("❌ Error fetching forecast for user {}: {}", user_id, e);
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Round coordinates to reduce API calls for nearby locations
+ fn round_coordinates(&self, lat: f64, lon: f64) -> (f64, f64) {
+ // Round to 2 decimal places (approximately 1km precision)
+ let rounded_lat = (lat * 100.0).round() / 100.0;
+ let rounded_lon = (lon * 100.0).round() / 100.0;
+ (rounded_lat, rounded_lon)
+ }
+
+ /// Fetch weather forecast from OpenWeatherMap API
+ async fn fetch_forecast(&self, lat: f64, lon: f64) -> Result<Value> {
+ let api_key = env::var("OPENWEATHERMAP_API_KEY")
+ .map_err(|_| anyhow::anyhow!("OPENWEATHERMAP_API_KEY environment variable not set"))?;
+
+ let response = self
+ .client
+ .get(API_URL)
+ .query(&[
+ ("lat", lat.to_string()),
+ ("lon", lon.to_string()),
+ ("units", "metric".to_string()),
+ ("appid", api_key),
+ ])
+ .send()
+ .await?;
+
+ if !response.status().is_success() {
+ let status = response.status();
+ let body = response.text().await?;
+ return Err(anyhow::anyhow!("API request failed: {} - {}", status, body));
+ }
+
+ let data: Value = response.json().await?;
+ Ok(data)
+ }
+
+ /// Check weather thresholds and send notifications if needed
+ async fn check_thresholds(
+ &self,
+ user_id: i64,
+ _location_id: i64,
+ forecast_data: &Value,
+ ) -> Result<()> {
+ let threshold_repo = WeatherThresholdRepository { db: &self.pool };
+ let thresholds = threshold_repo.list_thresholds(user_id).await?;
+
+ if thresholds.is_empty() {
+ return Ok(());
+ }
+
+ // Get the forecast list from the API response
+ let forecast_list = forecast_data
+ .get("list")
+ .and_then(|v| v.as_array())
+ .ok_or_else(|| anyhow::anyhow!("Invalid forecast data structure"))?;
+
+ let now = Utc::now();
+
+ // Check each forecast entry within the alert window
+ for forecast_entry in forecast_list {
+ if let Some(timestamp) = forecast_entry.get("dt").and_then(|v| v.as_i64()) {
+ let forecast_time = DateTime::from_timestamp(timestamp, 0)
+ .ok_or_else(|| anyhow::anyhow!("Invalid timestamp"))?;
+
+ let hours_diff = (forecast_time - now).num_hours();
+ if hours_diff > ALERT_WINDOW_HOURS {
+ continue; // Skip forecasts beyond our alert window
+ }
+
+ // Check if any thresholds are exceeded
+ for threshold in &thresholds {
+ if !threshold.enabled {
+ continue;
+ }
+
+ if self.is_threshold_exceeded(forecast_entry, threshold)? {
+ info!(
+ "🚨 Threshold exceeded for user {}: {} {} {}",
+ user_id,
+ threshold.condition_type,
+ threshold.operator,
+ threshold.threshold_value
+ );
+
+ // Send notification
+ self.send_notification(user_id, threshold, forecast_entry)
+ .await?;
+
+ // Only send one notification per check to avoid spam
+ return Ok(());
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Check if a specific threshold is exceeded
+ fn is_threshold_exceeded(
+ &self,
+ forecast_entry: &Value,
+ threshold: &crate::weather_thresholds::WeatherThreshold,
+ ) -> Result<bool> {
+ let value = match threshold.condition_type.as_str() {
+ "temperature" => forecast_entry
+ .get("main")
+ .and_then(|m| m.get("temp"))
+ .and_then(|t| t.as_f64())
+ .ok_or_else(|| anyhow::anyhow!("Temperature not found in forecast"))?,
+ "rain" => forecast_entry
+ .get("rain")
+ .and_then(|r| r.get("3h"))
+ .and_then(|r| r.as_f64())
+ .unwrap_or(0.0),
+ "wind_speed" => {
+ let wind_speed = forecast_entry
+ .get("wind")
+ .and_then(|w| w.get("speed"))
+ .and_then(|s| s.as_f64())
+ .ok_or_else(|| anyhow::anyhow!("Wind speed not found in forecast"))?;
+ // Convert m/s to km/h
+ wind_speed * 3.6
+ }
+ _ => {
+ warn!("Unknown condition type: {}", threshold.condition_type);
+ return Ok(false);
+ }
+ };
+
+ let threshold_value = threshold.threshold_value;
+ let exceeded = match threshold.operator.as_str() {
+ ">" => value > threshold_value,
+ ">=" => value >= threshold_value,
+ "<" => value < threshold_value,
+ "<=" => value <= threshold_value,
+ "==" => (value - threshold_value).abs() < 0.1, // Small tolerance for floating point
+ _ => {
+ warn!("Unknown operator: {}", threshold.operator);
+ false
+ }
+ };
+
+ if exceeded {
+ info!(
+ "🚨 {} threshold exceeded: {} {} {} (value: {})",
+ threshold.condition_type, value, threshold.operator, threshold_value, value
+ );
+ }
+
+ Ok(exceeded)
+ }
+
+ /// Send notification to user
+ async fn send_notification(
+ &self,
+ user_id: i64,
+ threshold: &crate::weather_thresholds::WeatherThreshold,
+ _forecast_entry: &Value,
+ ) -> Result<()> {
+ let ntfy_repo = NtfySettingsRepository { db: &self.pool };
+
+ if let Ok(Some(ntfy_settings)) = ntfy_repo.get_by_user(user_id).await {
+ if ntfy_settings.enabled {
+ // Create notification message
+ let title = format!(
+ "Weather Alert: {} {} {}",
+ threshold.condition_type, threshold.operator, threshold.threshold_value
+ );
+
+ let message = format!(
+ "Weather threshold exceeded for condition: {} {} {}",
+ threshold.condition_type, threshold.operator, threshold.threshold_value
+ );
+
+ // Send NTFY notification
+ self.send_ntfy_notification(&ntfy_settings, &title, &message)
+ .await?;
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Send NTFY notification
+ async fn send_ntfy_notification(
+ &self,
+ ntfy_settings: &crate::notifications::NtfySettings,
+ title: &str,
+ message: &str,
+ ) -> Result<()> {
+ let url = format!("{}/{}", ntfy_settings.server_url, ntfy_settings.topic);
+
+ let response = self
+ .client
+ .post(&url)
+ .header("Title", title)
+ .header("Priority", ntfy_settings.priority.to_string())
+ .body(message.to_string())
+ .send()
+ .await?;
+
+ if response.status().is_success() {
+ info!("NTFY notification sent successfully");
+ } else {
+ error!("Failed to send NTFY notification: {}", response.status());
+ }
+
+ Ok(())
+ }
+}
+
+impl Clone for WeatherPoller {
+ fn clone(&self) -> Self {
+ Self {
+ pool: Arc::clone(&self.pool),
+ client: Client::new(),
+ }
+ }
+}
+
+/// Background task scheduler for weather polling
+pub struct WeatherScheduler {
+ poller: WeatherPoller,
+}
+
+impl WeatherScheduler {
+ pub fn new(pool: Arc<SqlitePool>) -> Self {
+ Self {
+ poller: WeatherPoller::new(pool),
+ }
+ }
+
+ /// Start the scheduler to run weather checks hourly
+ pub async fn start(&self) -> Result<()> {
+ info!("Starting weather scheduler...");
+
+ loop {
+ info!("Running scheduled weather check...");
+
+ if let Err(e) = self.poller.check_all().await {
+ error!("Weather check failed: {}", e);
+ }
+
+ // Wait for 1 hour before next check
+ sleep(Duration::from_secs(3600)).await;
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use sqlx::{Executor, SqlitePool};
+
+ #[allow(dead_code)]
+ async fn setup_db() -> SqlitePool {
+ let pool = SqlitePool::connect(":memory:").await.unwrap();
+ pool.execute(
+ "CREATE TABLE users (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id TEXT NOT NULL UNIQUE,
+ role TEXT NOT NULL DEFAULT 'user'
+ );",
+ )
+ .await
+ .unwrap();
+ pool.execute(
+ "CREATE TABLE locations (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ latitude REAL NOT NULL,
+ longitude REAL NOT NULL,
+ user_id INTEGER NOT NULL,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE NO ACTION
+ );",
+ )
+ .await
+ .unwrap();
+ pool.execute(
+ "CREATE TABLE weather_api_data (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ location_id INTEGER NOT NULL,
+ api_type TEXT NOT NULL DEFAULT 'openweathermap',
+ data TEXT NOT NULL,
+ fetched_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE,
+ FOREIGN KEY(location_id) REFERENCES locations(id) ON DELETE CASCADE
+ );",
+ )
+ .await
+ .unwrap();
+ pool.execute(
+ "CREATE TABLE weather_thresholds (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ condition_type TEXT NOT NULL,
+ threshold_value REAL NOT NULL,
+ operator TEXT NOT NULL,
+ enabled BOOLEAN NOT NULL DEFAULT 1,
+ description TEXT,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
+ );",
+ )
+ .await
+ .unwrap();
+ pool.execute(
+ "CREATE TABLE user_notification_settings (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ ntfy_enabled BOOLEAN NOT NULL DEFAULT 0,
+ ntfy_topic TEXT,
+ ntfy_server_url TEXT,
+ ntfy_priority INTEGER NOT NULL DEFAULT 3,
+ ntfy_title_template TEXT,
+ ntfy_message_template TEXT,
+ smtp_enabled BOOLEAN NOT NULL DEFAULT 0,
+ smtp_email TEXT,
+ smtp_server TEXT,
+ smtp_port INTEGER,
+ smtp_username TEXT,
+ smtp_password TEXT,
+ smtp_use_tls BOOLEAN NOT NULL DEFAULT 1,
+ smtp_from_email TEXT,
+ smtp_from_name TEXT,
+ smtp_subject_template TEXT,
+ smtp_body_template TEXT,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
+ );",
+ )
+ .await
+ .unwrap();
+ pool
+ }
+
+ #[tokio::test]
+ async fn test_round_coordinates() {
+ let pool = SqlitePool::connect(":memory:").await.unwrap();
+ let poller = WeatherPoller::new(Arc::new(pool));
+
+ let (lat, lon) = poller.round_coordinates(60.1699, 24.9384);
+ assert_eq!(lat, 60.17);
+ assert_eq!(lon, 24.94);
+
+ let (lat, lon) = poller.round_coordinates(60.1649, 24.9334);
+ assert_eq!(lat, 60.16);
+ assert_eq!(lon, 24.93);
+ }
+
+ #[tokio::test]
+ async fn test_is_threshold_exceeded() {
+ let pool = SqlitePool::connect(":memory:").await.unwrap();
+ let poller = WeatherPoller::new(Arc::new(pool));
+
+ let forecast_entry = serde_json::json!({
+ "main": {"temp": 25.0},
+ "wind": {"speed": 10.0},
+ "rain": {"3h": 30.0}
+ });
+
+ let threshold = crate::weather_thresholds::WeatherThreshold {
+ id: 1,
+ user_id: 1,
+ condition_type: "temperature".to_string(),
+ threshold_value: 20.0,
+ operator: ">".to_string(),
+ enabled: true,
+ description: None,
+ };
+
+ let exceeded = poller
+ .is_threshold_exceeded(&forecast_entry, &threshold)
+ .unwrap();
+ assert!(exceeded);
+
+ let threshold = crate::weather_thresholds::WeatherThreshold {
+ id: 1,
+ user_id: 1,
+ condition_type: "temperature".to_string(),
+ threshold_value: 30.0,
+ operator: ">".to_string(),
+ enabled: true,
+ description: None,
+ };
+
+ let exceeded = poller
+ .is_threshold_exceeded(&forecast_entry, &threshold)
+ .unwrap();
+ assert!(!exceeded);
+ }
+}