use crate::{ notifications::NtfySettingsRepository, users::{UserRepository, UserWithLocation}, weather_api_data::WeatherApiDataRepository, weather_thresholds::WeatherThresholdRepository, }; use anyhow::Result; use chrono::{DateTime, Utc}; use reqwest::Client; use serde_json::Value; use sqlx::SqlitePool; use std::env; use std::sync::Arc; use tokio::time::{Duration, sleep}; use tracing::{error, info, warn}; const API_URL: &str = "https://api.openweathermap.org/data/2.5/forecast"; const ALERT_WINDOW_HOURS: i64 = 24; pub struct WeatherPoller { pool: Arc, client: Client, } impl WeatherPoller { pub fn new(pool: Arc) -> Self { Self { pool, client: Client::new(), } } /// Main function to check weather for all users with locations pub async fn check_all(&self) -> Result<()> { info!("🔄 Checking weather forecast for all users..."); let user_repo = UserRepository { db: &self.pool }; let users_with_locations = user_repo.list_users_with_locations().await?; if users_with_locations.is_empty() { info!("No users with locations found"); return Ok(()); } info!("Found {} users with locations", users_with_locations.len()); // Spawn async tasks for each user-location combination let mut handles = Vec::new(); for user_with_location in users_with_locations { let poller = self.clone(); let handle = tokio::spawn(async move { poller.check_user_weather(user_with_location).await }); handles.push(handle); } // Wait for all tasks to complete for handle in handles { if let Err(e) = handle.await { error!("Task failed: {}", e); } } Ok(()) } /// Check weather for a specific user and location async fn check_user_weather(&self, user_with_location: UserWithLocation) -> Result<()> { let user_id = user_with_location.id; let location_id = user_with_location.location_id; let lat = user_with_location.latitude; let lon = user_with_location.longitude; info!( "Checking weather for user {} at location {}", user_id, location_id ); // Round coordinates (similar to Elixir project) let (rounded_lat, rounded_lon) = self.round_coordinates(lat, lon); match self.fetch_forecast(rounded_lat, rounded_lon).await { Ok(forecast_data) => { // Store the weather data let api_data_repo = WeatherApiDataRepository { db: &self.pool }; let json_data = serde_json::to_string(&forecast_data)?; api_data_repo .create_weather_data( user_id, location_id, "openweathermap".to_string(), json_data, ) .await?; info!( "Weather data stored for user {} location {}", user_id, location_id ); // Check thresholds after storing data self.check_thresholds(user_id, location_id, &forecast_data) .await?; } Err(e) => { error!("❌ Error fetching forecast for user {}: {}", user_id, e); } } Ok(()) } /// Round coordinates to reduce API calls for nearby locations fn round_coordinates(&self, lat: f64, lon: f64) -> (f64, f64) { // Round to 2 decimal places (approximately 1km precision) let rounded_lat = (lat * 100.0).round() / 100.0; let rounded_lon = (lon * 100.0).round() / 100.0; (rounded_lat, rounded_lon) } /// Fetch weather forecast from OpenWeatherMap API async fn fetch_forecast(&self, lat: f64, lon: f64) -> Result { let api_key = env::var("OPENWEATHERMAP_API_KEY") .map_err(|_| anyhow::anyhow!("OPENWEATHERMAP_API_KEY environment variable not set"))?; let response = self .client .get(API_URL) .query(&[ ("lat", lat.to_string()), ("lon", lon.to_string()), ("units", "metric".to_string()), ("appid", api_key), ]) .send() .await?; if !response.status().is_success() { let status = response.status(); let body = response.text().await?; return Err(anyhow::anyhow!("API request failed: {} - {}", status, body)); } let data: Value = response.json().await?; Ok(data) } /// Check weather thresholds and send notifications if needed async fn check_thresholds( &self, user_id: i64, _location_id: i64, forecast_data: &Value, ) -> Result<()> { let threshold_repo = WeatherThresholdRepository { db: &self.pool }; let thresholds = threshold_repo.list_thresholds(user_id).await?; if thresholds.is_empty() { return Ok(()); } // Get the forecast list from the API response let forecast_list = forecast_data .get("list") .and_then(|v| v.as_array()) .ok_or_else(|| anyhow::anyhow!("Invalid forecast data structure"))?; let now = Utc::now(); // Check each forecast entry within the alert window for forecast_entry in forecast_list { if let Some(timestamp) = forecast_entry.get("dt").and_then(|v| v.as_i64()) { let forecast_time = DateTime::from_timestamp(timestamp, 0) .ok_or_else(|| anyhow::anyhow!("Invalid timestamp"))?; let hours_diff = (forecast_time - now).num_hours(); if hours_diff > ALERT_WINDOW_HOURS { continue; // Skip forecasts beyond our alert window } // Check if any thresholds are exceeded for threshold in &thresholds { if !threshold.enabled { continue; } if self.is_threshold_exceeded(forecast_entry, threshold)? { info!( "🚨 Threshold exceeded for user {}: {} {} {}", user_id, threshold.condition_type, threshold.operator, threshold.threshold_value ); // Send notification self.send_notification(user_id, threshold, forecast_entry) .await?; // Only send one notification per check to avoid spam return Ok(()); } } } } Ok(()) } /// Check if a specific threshold is exceeded fn is_threshold_exceeded( &self, forecast_entry: &Value, threshold: &crate::weather_thresholds::WeatherThreshold, ) -> Result { let value = match threshold.condition_type.as_str() { "temperature" => forecast_entry .get("main") .and_then(|m| m.get("temp")) .and_then(|t| t.as_f64()) .ok_or_else(|| anyhow::anyhow!("Temperature not found in forecast"))?, "rain" => forecast_entry .get("rain") .and_then(|r| r.get("3h")) .and_then(|r| r.as_f64()) .unwrap_or(0.0), "wind_speed" => { let wind_speed = forecast_entry .get("wind") .and_then(|w| w.get("speed")) .and_then(|s| s.as_f64()) .ok_or_else(|| anyhow::anyhow!("Wind speed not found in forecast"))?; // Convert m/s to km/h wind_speed * 3.6 } _ => { warn!("Unknown condition type: {}", threshold.condition_type); return Ok(false); } }; let threshold_value = threshold.threshold_value; let exceeded = match threshold.operator.as_str() { ">" => value > threshold_value, ">=" => value >= threshold_value, "<" => value < threshold_value, "<=" => value <= threshold_value, "==" => (value - threshold_value).abs() < 0.1, // Small tolerance for floating point _ => { warn!("Unknown operator: {}", threshold.operator); false } }; if exceeded { info!( "🚨 {} threshold exceeded: {} {} {} (value: {})", threshold.condition_type, value, threshold.operator, threshold_value, value ); } Ok(exceeded) } /// Send notification to user async fn send_notification( &self, user_id: i64, threshold: &crate::weather_thresholds::WeatherThreshold, _forecast_entry: &Value, ) -> Result<()> { let ntfy_repo = NtfySettingsRepository { db: &self.pool }; if let Ok(Some(ntfy_settings)) = ntfy_repo.get_by_user(user_id).await { if ntfy_settings.enabled { // Create notification message let title = format!( "Weather Alert: {} {} {}", threshold.condition_type, threshold.operator, threshold.threshold_value ); let message = format!( "Weather threshold exceeded for condition: {} {} {}", threshold.condition_type, threshold.operator, threshold.threshold_value ); // Send NTFY notification self.send_ntfy_notification(&ntfy_settings, &title, &message) .await?; } } Ok(()) } /// Send NTFY notification async fn send_ntfy_notification( &self, ntfy_settings: &crate::notifications::NtfySettings, title: &str, message: &str, ) -> Result<()> { let url = format!("{}/{}", ntfy_settings.server_url, ntfy_settings.topic); let response = self .client .post(&url) .header("Title", title) .header("Priority", ntfy_settings.priority.to_string()) .body(message.to_string()) .send() .await?; if response.status().is_success() { info!("NTFY notification sent successfully"); } else { error!("Failed to send NTFY notification: {}", response.status()); } Ok(()) } } impl Clone for WeatherPoller { fn clone(&self) -> Self { Self { pool: Arc::clone(&self.pool), client: Client::new(), } } } /// Background task scheduler for weather polling pub struct WeatherScheduler { poller: WeatherPoller, } impl WeatherScheduler { pub fn new(pool: Arc) -> Self { Self { poller: WeatherPoller::new(pool), } } /// Start the scheduler to run weather checks hourly pub async fn start(&self) -> Result<()> { info!("Starting weather scheduler..."); loop { info!("Running scheduled weather check..."); if let Err(e) = self.poller.check_all().await { error!("Weather check failed: {}", e); } // Wait for 1 hour before next check sleep(Duration::from_secs(3600)).await; } } } #[cfg(test)] mod tests { use super::*; use sqlx::{Executor, SqlitePool}; #[allow(dead_code)] async fn setup_db() -> SqlitePool { let pool = SqlitePool::connect(":memory:").await.unwrap(); pool.execute( "CREATE TABLE users ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id TEXT NOT NULL UNIQUE, role TEXT NOT NULL DEFAULT 'user' );", ) .await .unwrap(); pool.execute( "CREATE TABLE locations ( id INTEGER PRIMARY KEY AUTOINCREMENT, latitude REAL NOT NULL, longitude REAL NOT NULL, user_id INTEGER NOT NULL, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE NO ACTION );", ) .await .unwrap(); pool.execute( "CREATE TABLE weather_api_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, location_id INTEGER NOT NULL, api_type TEXT NOT NULL DEFAULT 'openweathermap', data TEXT NOT NULL, fetched_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE, FOREIGN KEY(location_id) REFERENCES locations(id) ON DELETE CASCADE );", ) .await .unwrap(); pool.execute( "CREATE TABLE weather_thresholds ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, condition_type TEXT NOT NULL, threshold_value REAL NOT NULL, operator TEXT NOT NULL, enabled BOOLEAN NOT NULL DEFAULT 1, description TEXT, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE );", ) .await .unwrap(); pool.execute( "CREATE TABLE user_notification_settings ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, ntfy_enabled BOOLEAN NOT NULL DEFAULT 0, ntfy_topic TEXT, ntfy_server_url TEXT, ntfy_priority INTEGER NOT NULL DEFAULT 3, ntfy_title_template TEXT, ntfy_message_template TEXT, smtp_enabled BOOLEAN NOT NULL DEFAULT 0, smtp_email TEXT, smtp_server TEXT, smtp_port INTEGER, smtp_username TEXT, smtp_password TEXT, smtp_use_tls BOOLEAN NOT NULL DEFAULT 1, smtp_from_email TEXT, smtp_from_name TEXT, smtp_subject_template TEXT, smtp_body_template TEXT, FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE );", ) .await .unwrap(); pool } #[tokio::test] async fn test_round_coordinates() { let pool = SqlitePool::connect(":memory:").await.unwrap(); let poller = WeatherPoller::new(Arc::new(pool)); let (lat, lon) = poller.round_coordinates(60.1699, 24.9384); assert_eq!(lat, 60.17); assert_eq!(lon, 24.94); let (lat, lon) = poller.round_coordinates(60.1649, 24.9334); assert_eq!(lat, 60.16); assert_eq!(lon, 24.93); } #[tokio::test] async fn test_is_threshold_exceeded() { let pool = SqlitePool::connect(":memory:").await.unwrap(); let poller = WeatherPoller::new(Arc::new(pool)); let forecast_entry = serde_json::json!({ "main": {"temp": 25.0}, "wind": {"speed": 10.0}, "rain": {"3h": 30.0} }); let threshold = crate::weather_thresholds::WeatherThreshold { id: 1, user_id: 1, condition_type: "temperature".to_string(), threshold_value: 20.0, operator: ">".to_string(), enabled: true, description: None, }; let exceeded = poller .is_threshold_exceeded(&forecast_entry, &threshold) .unwrap(); assert!(exceeded); let threshold = crate::weather_thresholds::WeatherThreshold { id: 1, user_id: 1, condition_type: "temperature".to_string(), threshold_value: 30.0, operator: ">".to_string(), enabled: true, description: None, }; let exceeded = poller .is_threshold_exceeded(&forecast_entry, &threshold) .unwrap(); assert!(!exceeded); } }