diff options
Diffstat (limited to 'src/weather_poller.rs')
| -rw-r--r-- | src/weather_poller.rs | 503 |
1 files changed, 502 insertions, 1 deletions
diff --git a/src/weather_poller.rs b/src/weather_poller.rs index ea211b7..76f3fda 100644 --- a/src/weather_poller.rs +++ b/src/weather_poller.rs @@ -1 +1,502 @@ -// Remove all unused imports at the top of the file. +use crate::{ + notifications::NtfySettingsRepository, + users::{UserRepository, UserWithLocation}, + weather_api_data::WeatherApiDataRepository, + weather_thresholds::WeatherThresholdRepository, +}; +use anyhow::Result; +use chrono::{DateTime, Utc}; +use reqwest::Client; +use serde_json::Value; +use sqlx::SqlitePool; +use std::env; +use std::sync::Arc; +use tokio::time::{Duration, sleep}; +use tracing::{error, info, warn}; + +const API_URL: &str = "https://api.openweathermap.org/data/2.5/forecast"; +const ALERT_WINDOW_HOURS: i64 = 24; + +pub struct WeatherPoller { + pool: Arc<SqlitePool>, + client: Client, +} + +impl WeatherPoller { + pub fn new(pool: Arc<SqlitePool>) -> Self { + Self { + pool, + client: Client::new(), + } + } + + /// Main function to check weather for all users with locations + pub async fn check_all(&self) -> Result<()> { + info!("🔄 Checking weather forecast for all users..."); + + let user_repo = UserRepository { db: &self.pool }; + let users_with_locations = user_repo.list_users_with_locations().await?; + + if users_with_locations.is_empty() { + info!("No users with locations found"); + return Ok(()); + } + + info!("Found {} users with locations", users_with_locations.len()); + + // Spawn async tasks for each user-location combination + let mut handles = Vec::new(); + for user_with_location in users_with_locations { + let poller = self.clone(); + let handle = + tokio::spawn(async move { poller.check_user_weather(user_with_location).await }); + handles.push(handle); + } + + // Wait for all tasks to complete + for handle in handles { + if let Err(e) = handle.await { + error!("Task failed: {}", e); + } + } + + Ok(()) + } + + /// Check weather for a specific user and location + async fn check_user_weather(&self, user_with_location: UserWithLocation) -> Result<()> { + let user_id = user_with_location.id; + let location_id = user_with_location.location_id; + let lat = user_with_location.latitude; + let lon = user_with_location.longitude; + + info!( + "Checking weather for user {} at location {}", + user_id, location_id + ); + + // Round coordinates (similar to Elixir project) + let (rounded_lat, rounded_lon) = self.round_coordinates(lat, lon); + + match self.fetch_forecast(rounded_lat, rounded_lon).await { + Ok(forecast_data) => { + // Store the weather data + let api_data_repo = WeatherApiDataRepository { db: &self.pool }; + let json_data = serde_json::to_string(&forecast_data)?; + + api_data_repo + .create_weather_data( + user_id, + location_id, + "openweathermap".to_string(), + json_data, + ) + .await?; + + info!( + "Weather data stored for user {} location {}", + user_id, location_id + ); + + // Check thresholds after storing data + self.check_thresholds(user_id, location_id, &forecast_data) + .await?; + } + Err(e) => { + error!("❌ Error fetching forecast for user {}: {}", user_id, e); + } + } + + Ok(()) + } + + /// Round coordinates to reduce API calls for nearby locations + fn round_coordinates(&self, lat: f64, lon: f64) -> (f64, f64) { + // Round to 2 decimal places (approximately 1km precision) + let rounded_lat = (lat * 100.0).round() / 100.0; + let rounded_lon = (lon * 100.0).round() / 100.0; + (rounded_lat, rounded_lon) + } + + /// Fetch weather forecast from OpenWeatherMap API + async fn fetch_forecast(&self, lat: f64, lon: f64) -> Result<Value> { + let api_key = env::var("OPENWEATHERMAP_API_KEY") + .map_err(|_| anyhow::anyhow!("OPENWEATHERMAP_API_KEY environment variable not set"))?; + + let response = self + .client + .get(API_URL) + .query(&[ + ("lat", lat.to_string()), + ("lon", lon.to_string()), + ("units", "metric".to_string()), + ("appid", api_key), + ]) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await?; + return Err(anyhow::anyhow!("API request failed: {} - {}", status, body)); + } + + let data: Value = response.json().await?; + Ok(data) + } + + /// Check weather thresholds and send notifications if needed + async fn check_thresholds( + &self, + user_id: i64, + _location_id: i64, + forecast_data: &Value, + ) -> Result<()> { + let threshold_repo = WeatherThresholdRepository { db: &self.pool }; + let thresholds = threshold_repo.list_thresholds(user_id).await?; + + if thresholds.is_empty() { + return Ok(()); + } + + // Get the forecast list from the API response + let forecast_list = forecast_data + .get("list") + .and_then(|v| v.as_array()) + .ok_or_else(|| anyhow::anyhow!("Invalid forecast data structure"))?; + + let now = Utc::now(); + + // Check each forecast entry within the alert window + for forecast_entry in forecast_list { + if let Some(timestamp) = forecast_entry.get("dt").and_then(|v| v.as_i64()) { + let forecast_time = DateTime::from_timestamp(timestamp, 0) + .ok_or_else(|| anyhow::anyhow!("Invalid timestamp"))?; + + let hours_diff = (forecast_time - now).num_hours(); + if hours_diff > ALERT_WINDOW_HOURS { + continue; // Skip forecasts beyond our alert window + } + + // Check if any thresholds are exceeded + for threshold in &thresholds { + if !threshold.enabled { + continue; + } + + if self.is_threshold_exceeded(forecast_entry, threshold)? { + info!( + "🚨 Threshold exceeded for user {}: {} {} {}", + user_id, + threshold.condition_type, + threshold.operator, + threshold.threshold_value + ); + + // Send notification + self.send_notification(user_id, threshold, forecast_entry) + .await?; + + // Only send one notification per check to avoid spam + return Ok(()); + } + } + } + } + + Ok(()) + } + + /// Check if a specific threshold is exceeded + fn is_threshold_exceeded( + &self, + forecast_entry: &Value, + threshold: &crate::weather_thresholds::WeatherThreshold, + ) -> Result<bool> { + let value = match threshold.condition_type.as_str() { + "temperature" => forecast_entry + .get("main") + .and_then(|m| m.get("temp")) + .and_then(|t| t.as_f64()) + .ok_or_else(|| anyhow::anyhow!("Temperature not found in forecast"))?, + "rain" => forecast_entry + .get("rain") + .and_then(|r| r.get("3h")) + .and_then(|r| r.as_f64()) + .unwrap_or(0.0), + "wind_speed" => { + let wind_speed = forecast_entry + .get("wind") + .and_then(|w| w.get("speed")) + .and_then(|s| s.as_f64()) + .ok_or_else(|| anyhow::anyhow!("Wind speed not found in forecast"))?; + // Convert m/s to km/h + wind_speed * 3.6 + } + _ => { + warn!("Unknown condition type: {}", threshold.condition_type); + return Ok(false); + } + }; + + let threshold_value = threshold.threshold_value; + let exceeded = match threshold.operator.as_str() { + ">" => value > threshold_value, + ">=" => value >= threshold_value, + "<" => value < threshold_value, + "<=" => value <= threshold_value, + "==" => (value - threshold_value).abs() < 0.1, // Small tolerance for floating point + _ => { + warn!("Unknown operator: {}", threshold.operator); + false + } + }; + + if exceeded { + info!( + "🚨 {} threshold exceeded: {} {} {} (value: {})", + threshold.condition_type, value, threshold.operator, threshold_value, value + ); + } + + Ok(exceeded) + } + + /// Send notification to user + async fn send_notification( + &self, + user_id: i64, + threshold: &crate::weather_thresholds::WeatherThreshold, + _forecast_entry: &Value, + ) -> Result<()> { + let ntfy_repo = NtfySettingsRepository { db: &self.pool }; + + if let Ok(Some(ntfy_settings)) = ntfy_repo.get_by_user(user_id).await { + if ntfy_settings.enabled { + // Create notification message + let title = format!( + "Weather Alert: {} {} {}", + threshold.condition_type, threshold.operator, threshold.threshold_value + ); + + let message = format!( + "Weather threshold exceeded for condition: {} {} {}", + threshold.condition_type, threshold.operator, threshold.threshold_value + ); + + // Send NTFY notification + self.send_ntfy_notification(&ntfy_settings, &title, &message) + .await?; + } + } + + Ok(()) + } + + /// Send NTFY notification + async fn send_ntfy_notification( + &self, + ntfy_settings: &crate::notifications::NtfySettings, + title: &str, + message: &str, + ) -> Result<()> { + let url = format!("{}/{}", ntfy_settings.server_url, ntfy_settings.topic); + + let response = self + .client + .post(&url) + .header("Title", title) + .header("Priority", ntfy_settings.priority.to_string()) + .body(message.to_string()) + .send() + .await?; + + if response.status().is_success() { + info!("NTFY notification sent successfully"); + } else { + error!("Failed to send NTFY notification: {}", response.status()); + } + + Ok(()) + } +} + +impl Clone for WeatherPoller { + fn clone(&self) -> Self { + Self { + pool: Arc::clone(&self.pool), + client: Client::new(), + } + } +} + +/// Background task scheduler for weather polling +pub struct WeatherScheduler { + poller: WeatherPoller, +} + +impl WeatherScheduler { + pub fn new(pool: Arc<SqlitePool>) -> Self { + Self { + poller: WeatherPoller::new(pool), + } + } + + /// Start the scheduler to run weather checks hourly + pub async fn start(&self) -> Result<()> { + info!("Starting weather scheduler..."); + + loop { + info!("Running scheduled weather check..."); + + if let Err(e) = self.poller.check_all().await { + error!("Weather check failed: {}", e); + } + + // Wait for 1 hour before next check + sleep(Duration::from_secs(3600)).await; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sqlx::{Executor, SqlitePool}; + + #[allow(dead_code)] + async fn setup_db() -> SqlitePool { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + pool.execute( + "CREATE TABLE users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL UNIQUE, + role TEXT NOT NULL DEFAULT 'user' + );", + ) + .await + .unwrap(); + pool.execute( + "CREATE TABLE locations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + latitude REAL NOT NULL, + longitude REAL NOT NULL, + user_id INTEGER NOT NULL, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE NO ACTION + );", + ) + .await + .unwrap(); + pool.execute( + "CREATE TABLE weather_api_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + location_id INTEGER NOT NULL, + api_type TEXT NOT NULL DEFAULT 'openweathermap', + data TEXT NOT NULL, + fetched_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE, + FOREIGN KEY(location_id) REFERENCES locations(id) ON DELETE CASCADE + );", + ) + .await + .unwrap(); + pool.execute( + "CREATE TABLE weather_thresholds ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + condition_type TEXT NOT NULL, + threshold_value REAL NOT NULL, + operator TEXT NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT 1, + description TEXT, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + );", + ) + .await + .unwrap(); + pool.execute( + "CREATE TABLE user_notification_settings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + ntfy_enabled BOOLEAN NOT NULL DEFAULT 0, + ntfy_topic TEXT, + ntfy_server_url TEXT, + ntfy_priority INTEGER NOT NULL DEFAULT 3, + ntfy_title_template TEXT, + ntfy_message_template TEXT, + smtp_enabled BOOLEAN NOT NULL DEFAULT 0, + smtp_email TEXT, + smtp_server TEXT, + smtp_port INTEGER, + smtp_username TEXT, + smtp_password TEXT, + smtp_use_tls BOOLEAN NOT NULL DEFAULT 1, + smtp_from_email TEXT, + smtp_from_name TEXT, + smtp_subject_template TEXT, + smtp_body_template TEXT, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + );", + ) + .await + .unwrap(); + pool + } + + #[tokio::test] + async fn test_round_coordinates() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + let poller = WeatherPoller::new(Arc::new(pool)); + + let (lat, lon) = poller.round_coordinates(60.1699, 24.9384); + assert_eq!(lat, 60.17); + assert_eq!(lon, 24.94); + + let (lat, lon) = poller.round_coordinates(60.1649, 24.9334); + assert_eq!(lat, 60.16); + assert_eq!(lon, 24.93); + } + + #[tokio::test] + async fn test_is_threshold_exceeded() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + let poller = WeatherPoller::new(Arc::new(pool)); + + let forecast_entry = serde_json::json!({ + "main": {"temp": 25.0}, + "wind": {"speed": 10.0}, + "rain": {"3h": 30.0} + }); + + let threshold = crate::weather_thresholds::WeatherThreshold { + id: 1, + user_id: 1, + condition_type: "temperature".to_string(), + threshold_value: 20.0, + operator: ">".to_string(), + enabled: true, + description: None, + }; + + let exceeded = poller + .is_threshold_exceeded(&forecast_entry, &threshold) + .unwrap(); + assert!(exceeded); + + let threshold = crate::weather_thresholds::WeatherThreshold { + id: 1, + user_id: 1, + condition_type: "temperature".to_string(), + threshold_value: 30.0, + operator: ">".to_string(), + enabled: true, + description: None, + }; + + let exceeded = poller + .is_threshold_exceeded(&forecast_entry, &threshold) + .unwrap(); + assert!(!exceeded); + } +} |
