From 934fb31059da10fa843d96a10c37f181eaa89456 Mon Sep 17 00:00:00 2001 From: Dawid Rycerz Date: Mon, 21 Jul 2025 21:10:22 +0300 Subject: feat: add weather pooler and config --- README.md | 103 ++++- .../20240101000400_create_weather_api_data.sql | 15 + src/config.rs | 160 +++++++ src/main.rs | 94 +++- src/users.rs | 21 + src/weather_api_data.rs | 227 ++++++++++ src/weather_poller.rs | 503 ++++++++++++++++++++- 7 files changed, 1089 insertions(+), 34 deletions(-) create mode 100644 migrations/20240101000400_create_weather_api_data.sql create mode 100644 src/config.rs create mode 100644 src/weather_api_data.rs diff --git a/README.md b/README.md index 61248fd..fc7382b 100644 --- a/README.md +++ b/README.md @@ -4,11 +4,14 @@ Silmataivas is a weather monitoring service that sends personalized alerts based ## Features -- Weather monitoring using OpenWeatherMap API -- Custom weather thresholds per user -- Flexible notifications: NTFY (push) and SMTP (email) -- User-specific configuration -- RESTful API for all resources +- **Weather monitoring using OpenWeatherMap API**: Automatic hourly weather data fetching for all users with locations +- **Location-based weather tracking**: Each user can set their location for personalized weather monitoring +- **Custom weather thresholds per user**: Define temperature, rain, and wind speed thresholds with custom operators +- **Automatic threshold checking**: After fetching weather data, the system automatically checks if any thresholds are exceeded +- **Flexible notifications**: NTFY (push) and SMTP (email) notifications when thresholds are exceeded +- **User-specific configuration**: Each user has their own settings and thresholds +- **RESTful API for all resources**: Complete API for managing users, locations, thresholds, and notifications +- **Background task scheduling**: Weather polling runs automatically every hour in the background ## Project Structure @@ -28,21 +31,99 @@ Silmataivas is a weather monitoring service that sends personalized alerts based ```bash # Clone the repository - git clone https://codeberg.org/silmataivas/silmataivas.git - cd silmataivas +git clone https://codeberg.org/silmataivas/silmataivas.git +cd silmataivas + +# Set required environment variables +export OPENWEATHERMAP_API_KEY="your_api_key_here" # Build and run - cargo build --release - ./target/release/silmataivas +cargo build --release +./target/release/silmataivas ``` -The server will listen on port 4000 by default. You can set the `DATABASE_URL` environment variable to change the database location. +The server will: +- Listen on port 4000 by default (configurable via `PORT` environment variable) +- Use `~/.local/share/silmataivas/silmataivas.db` as the default database location +- Create the database directory automatically if it doesn't exist +- Generate an admin token automatically if none is provided ### Using Docker ```bash docker build -t silmataivas . -docker run -p 4000:4000 -e DATABASE_URL=sqlite:///data/silmataivas.db silmataivas +docker run -p 4000:4000 \ + -e OPENWEATHERMAP_API_KEY="your_api_key_here" \ + -e DATABASE_URL="sqlite:///data/silmataivas.db" \ + silmataivas +``` + +## Weather Polling + +The application automatically fetches weather data from OpenWeatherMap API every hour for all users who have set their locations. Here's how it works: + +### Automatic Weather Data Collection + +1. **Hourly Scheduling**: The weather poller runs every hour in the background +2. **Location Rounding**: Coordinates are rounded to 2 decimal places (~1km precision) to reduce API calls for nearby locations +3. **Async Processing**: Each user-location combination is processed in parallel for efficiency +4. **Data Storage**: Weather data is stored in the `weather_api_data` table for future reference + +### Threshold Checking + +After fetching weather data, the system automatically: +1. Checks all enabled thresholds for the user +2. Compares current weather conditions against threshold values +3. Sends notifications (NTFY/SMTP) if thresholds are exceeded +4. Only sends one notification per check to avoid spam + +### Manual Weather Check + +You can manually trigger a weather check for testing: + +```bash +cargo run -- check-weather +``` + +### Environment Variables + +The application uses the following environment variables for configuration: + +#### Required +- `OPENWEATHERMAP_API_KEY`: Your OpenWeatherMap API key for weather data access + +#### Optional +- `DATABASE_URL`: Database connection string (defaults to XDG_DATA_HOME or ~/.local/share) +- `XDG_DATA_HOME`: Custom data directory (defaults to ~/.local/share) +- `ADMIN_TOKEN`: Initial admin user token (auto-generated if not set) +- `PORT`: Server port (default: 4000) +- `HOST`: Server host (default: 0.0.0.0) +- `LOG_LEVEL`: Logging level (default: info) + +#### Database URL Fallback Logic + +If `DATABASE_URL` is not set, the application will use the following fallback path: +1. `$XDG_DATA_HOME/silmataivas/silmataivas.db` (if XDG_DATA_HOME is set) +2. `~/.local/share/silmataivas/silmataivas.db` (default fallback) + +The directory will be created automatically if it doesn't exist. + +#### Example Configuration + +```bash +# Required +export OPENWEATHERMAP_API_KEY="your_api_key_here" + +# Optional - custom database location +export DATABASE_URL="sqlite:///path/to/custom/database.db" + +# Optional - custom data directory +export XDG_DATA_HOME="/custom/data/path" + +# Optional - server configuration +export PORT="8080" +export HOST="127.0.0.1" +export LOG_LEVEL="debug" ``` ## API Usage diff --git a/migrations/20240101000400_create_weather_api_data.sql b/migrations/20240101000400_create_weather_api_data.sql new file mode 100644 index 0000000..16fdf35 --- /dev/null +++ b/migrations/20240101000400_create_weather_api_data.sql @@ -0,0 +1,15 @@ +-- Migration: Create weather_api_data table +CREATE TABLE IF NOT EXISTS weather_api_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + location_id INTEGER NOT NULL, + api_type TEXT NOT NULL DEFAULT 'openweathermap', + data TEXT NOT NULL, -- JSON data from the API + fetched_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE, + FOREIGN KEY(location_id) REFERENCES locations(id) ON DELETE CASCADE +); +CREATE INDEX IF NOT EXISTS idx_weather_api_data_user_id ON weather_api_data(user_id); +CREATE INDEX IF NOT EXISTS idx_weather_api_data_location_id ON weather_api_data(location_id); +CREATE INDEX IF NOT EXISTS idx_weather_api_data_fetched_at ON weather_api_data(fetched_at); +CREATE INDEX IF NOT EXISTS idx_weather_api_data_api_type ON weather_api_data(api_type); \ No newline at end of file diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..186ef82 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,160 @@ +use std::env; +use std::path::PathBuf; + +#[derive(Debug, Clone)] +pub struct Config { + pub database_url: String, + #[allow(dead_code)] + pub openweathermap_api_key: Option, + pub admin_token: Option, + pub server_port: u16, + pub server_host: String, + pub log_level: String, +} + +impl Config { + pub fn from_env() -> Self { + let database_url = Self::get_database_url(); + let openweathermap_api_key = env::var("OPENWEATHERMAP_API_KEY").ok(); + let admin_token = env::var("ADMIN_TOKEN").ok(); + let server_port = env::var("PORT") + .unwrap_or_else(|_| "4000".to_string()) + .parse() + .unwrap_or(4000); + let server_host = env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string()); + let log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string()); + + Self { + database_url, + openweathermap_api_key, + admin_token, + server_port, + server_host, + log_level, + } + } + + fn get_database_url() -> String { + // First, check if DATABASE_URL is explicitly set + if let Ok(url) = env::var("DATABASE_URL") { + return url; + } + + // If not set, construct the path using XDG_DATA_HOME or fallback + let data_dir = if let Ok(xdg_data_home) = env::var("XDG_DATA_HOME") { + PathBuf::from(xdg_data_home) + } else { + // Fallback to ~/.local/share + let home = env::var("HOME") + .unwrap_or_else(|_| env::var("USERPROFILE").unwrap_or_else(|_| "/tmp".to_string())); + PathBuf::from(home).join(".local").join("share") + }; + + let db_path = data_dir.join("silmataivas").join("silmataivas.db"); + + // Ensure the directory exists + if let Some(parent) = db_path.parent() { + let _ = std::fs::create_dir_all(parent); + } + + format!("sqlite://{}", db_path.display()) + } + + #[allow(dead_code)] + pub fn validate(&self) -> Result<(), String> { + if self.openweathermap_api_key.is_none() { + return Err("OPENWEATHERMAP_API_KEY environment variable is required".to_string()); + } + Ok(()) + } + + #[allow(dead_code)] + pub fn validate_optional(&self) -> Result<(), String> { + // For optional validation (e.g., during development) + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + use std::sync::Once; + + static INIT: Once = Once::new(); + + fn setup() { + INIT.call_once(|| { + // Clear all environment variables that might interfere with tests + unsafe { + env::remove_var("DATABASE_URL"); + env::remove_var("XDG_DATA_HOME"); + env::remove_var("PORT"); + env::remove_var("HOST"); + env::remove_var("LOG_LEVEL"); + } + }); + + // Clear environment variables before each test + unsafe { + env::remove_var("DATABASE_URL"); + env::remove_var("XDG_DATA_HOME"); + env::remove_var("PORT"); + env::remove_var("HOST"); + env::remove_var("LOG_LEVEL"); + } + } + + #[test] + fn test_database_url_fallback() { + setup(); + + let home = env::var("HOME").unwrap_or_else(|_| "/tmp".to_string()); + let expected_path = format!("sqlite://{home}/.local/share/silmataivas/silmataivas.db"); + + let config = Config::from_env(); + assert_eq!(config.database_url, expected_path); + } + + #[test] + fn test_database_url_xdg_data_home() { + setup(); + + unsafe { + env::set_var("XDG_DATA_HOME", "/custom/data/path"); + } + + let expected_path = "sqlite:///custom/data/path/silmataivas/silmataivas.db"; + + let config = Config::from_env(); + assert_eq!(config.database_url, expected_path); + } + + #[test] + fn test_database_url_explicit() { + setup(); + + unsafe { + env::set_var("DATABASE_URL", "sqlite:///explicit/path.db"); + } + + let config = Config::from_env(); + assert_eq!(config.database_url, "sqlite:///explicit/path.db"); + } + + #[test] + fn test_server_config() { + setup(); + + unsafe { + env::set_var("PORT", "8080"); + env::set_var("HOST", "127.0.0.1"); + env::set_var("LOG_LEVEL", "debug"); + } + + let config = Config::from_env(); + assert_eq!(config.server_port, 8080); + assert_eq!(config.server_host, "127.0.0.1"); + assert_eq!(config.log_level, "debug"); + } +} diff --git a/src/main.rs b/src/main.rs index 4e89234..9f97e37 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,10 +6,8 @@ use axum::{Router, routing::get}; use clap::{Parser, Subcommand}; use serde_json::json; use sqlx::SqlitePool; -use std::env; use std::net::SocketAddr; use std::sync::Arc; -use tokio::fs; use tracing::{error, info}; use tracing_subscriber::{EnvFilter, FmtSubscriber}; use utoipa::OpenApi; @@ -17,10 +15,12 @@ use utoipa_swagger_ui::SwaggerUi; use uuid::Uuid; mod auth; +mod config; mod health; mod locations; mod notifications; mod users; +mod weather_api_data; mod weather_poller; mod weather_thresholds; @@ -918,31 +918,42 @@ enum Commands { Server, /// Create a new user with optional UUID CreateUser { uuid: Option }, + /// Manually trigger weather check for testing + CheckWeather, } #[tokio::main] async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); - // Set up logging based on verbosity - let filter = match cli.verbose { - 0 => "warn", - 1 => "info", - 2 => "debug", - _ => "trace", + + // Load configuration + let config = config::Config::from_env(); + + // Set up logging based on verbosity or config + let filter = if cli.verbose > 0 { + match cli.verbose { + 1 => "info", + 2 => "debug", + _ => "trace", + } + } else { + &config.log_level }; + let subscriber = FmtSubscriber::builder() .with_env_filter(EnvFilter::new(filter)) .finish(); tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed"); + + info!( + "Starting Silmataivas with database: {}", + config.database_url + ); + match cli.command.unwrap_or(Commands::Server) { Commands::Server => { - // Set up database path - let db_path = env::var("DATABASE_URL") - .unwrap_or_else(|_| "sqlite://./data/silmataivas.db".to_string()); - // Ensure data directory exists - let _ = fs::create_dir_all("./data").await; - // Connect to SQLite - let pool = SqlitePool::connect(&db_path) + // Connect to database + let pool = SqlitePool::connect(&config.database_url) .await .expect("Failed to connect to DB"); @@ -951,8 +962,9 @@ async fn main() -> anyhow::Result<()> { let repo = users::UserRepository { db: &pool }; match repo.any_admin_exists().await { Ok(false) => { - let admin_token = - env::var("ADMIN_TOKEN").unwrap_or_else(|_| Uuid::new_v4().to_string()); + let admin_token = config + .admin_token + .unwrap_or_else(|| Uuid::new_v4().to_string()); match repo .create_user(Some(admin_token.clone()), Some(users::UserRole::Admin)) .await @@ -970,17 +982,42 @@ async fn main() -> anyhow::Result<()> { } } - let app = app_with_state(Arc::new(pool)); - let addr = SocketAddr::from(([0, 0, 0, 0], 4000)); + let app = app_with_state(Arc::new(pool.clone())); + let addr = SocketAddr::from(( + config + .server_host + .parse::() + .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0))), + config.server_port, + )); let listener = tokio::net::TcpListener::bind(addr) .await .expect("Failed to bind address"); info!("Listening on {}", listener.local_addr().unwrap()); - axum::serve(listener, app).await.unwrap(); + + // Start the weather scheduler in the background + let scheduler = crate::weather_poller::WeatherScheduler::new(Arc::new(pool)); + let scheduler_handle = tokio::spawn(async move { + if let Err(e) = scheduler.start().await { + error!("Weather scheduler failed: {}", e); + } + }); + + // Start the web server + let server_handle = axum::serve(listener, app); + + // Wait for either the server or scheduler to fail + tokio::select! { + _ = server_handle => { + error!("Web server stopped"); + } + _ = scheduler_handle => { + error!("Weather scheduler stopped"); + } + } } Commands::CreateUser { uuid } => { - let db_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - let pool = SqlitePool::connect(&db_url).await?; + let pool = SqlitePool::connect(&config.database_url).await?; let repo = crate::users::UserRepository { db: &pool }; let user_id = uuid.unwrap_or_else(|| Uuid::new_v4().to_string()); let user = repo @@ -1037,6 +1074,19 @@ async fn main() -> anyhow::Result<()> { info!("User {} created", user_id); } + Commands::CheckWeather => { + let pool = SqlitePool::connect(&config.database_url) + .await + .expect("Failed to connect to DB"); + + let poller = crate::weather_poller::WeatherPoller::new(Arc::new(pool)); + info!("Manually triggering weather check..."); + + match poller.check_all().await { + Ok(()) => info!("Weather check completed successfully"), + Err(e) => error!("Weather check failed: {}", e), + } + } } Ok(()) } diff --git a/src/users.rs b/src/users.rs index 43f190d..dad24bc 100644 --- a/src/users.rs +++ b/src/users.rs @@ -20,6 +20,16 @@ pub enum UserRole { Admin, } +#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq, ToSchema)] +pub struct UserWithLocation { + pub id: i64, + pub user_id: String, + pub role: UserRole, + pub location_id: i64, + pub latitude: f64, + pub longitude: f64, +} + pub struct UserRepository<'a> { pub db: &'a sqlx::SqlitePool, } @@ -79,6 +89,17 @@ impl<'a> UserRepository<'a> { Ok(()) } + pub async fn list_users_with_locations(&self) -> Result, sqlx::Error> { + sqlx::query_as::<_, UserWithLocation>( + "SELECT u.id, u.user_id, u.role, l.id as location_id, l.latitude, l.longitude + FROM users u + LEFT JOIN locations l ON u.id = l.user_id + WHERE l.id IS NOT NULL", + ) + .fetch_all(self.db) + .await + } + pub async fn any_admin_exists(&self) -> Result { let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users WHERE role = ?") .bind(UserRole::Admin) diff --git a/src/weather_api_data.rs b/src/weather_api_data.rs new file mode 100644 index 0000000..14a1cac --- /dev/null +++ b/src/weather_api_data.rs @@ -0,0 +1,227 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; +use utoipa::ToSchema; + +#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq, ToSchema)] +pub struct WeatherApiData { + pub id: i64, + pub user_id: i64, + pub location_id: i64, + pub api_type: String, + pub data: String, // JSON data from the API + #[sqlx(rename = "fetched_at")] + pub fetched_at: String, // Store as string, convert when needed +} + +impl WeatherApiData { + #[allow(dead_code)] + pub fn fetched_at_datetime(&self) -> Result, chrono::ParseError> { + DateTime::parse_from_rfc3339(&self.fetched_at).map(|dt| dt.with_timezone(&Utc)) + } +} + +pub struct WeatherApiDataRepository<'a> { + pub db: &'a sqlx::SqlitePool, +} + +impl<'a> WeatherApiDataRepository<'a> { + pub async fn create_weather_data( + &self, + user_id: i64, + location_id: i64, + api_type: String, + data: String, + ) -> Result { + sqlx::query_as::<_, WeatherApiData>( + "INSERT INTO weather_api_data (user_id, location_id, api_type, data) VALUES (?, ?, ?, ?) RETURNING id, user_id, location_id, api_type, data, fetched_at" + ) + .bind(user_id) + .bind(location_id) + .bind(api_type) + .bind(data) + .fetch_one(self.db) + .await + } + + #[allow(dead_code)] + pub async fn get_latest_weather_data( + &self, + user_id: i64, + location_id: i64, + api_type: &str, + ) -> Result, sqlx::Error> { + sqlx::query_as::<_, WeatherApiData>( + "SELECT id, user_id, location_id, api_type, data, fetched_at FROM weather_api_data WHERE user_id = ? AND location_id = ? AND api_type = ? ORDER BY fetched_at DESC LIMIT 1" + ) + .bind(user_id) + .bind(location_id) + .bind(api_type) + .fetch_optional(self.db) + .await + } + + #[allow(dead_code)] + pub async fn list_weather_data_by_user( + &self, + user_id: i64, + limit: Option, + ) -> Result, sqlx::Error> { + let limit = limit.unwrap_or(100); + sqlx::query_as::<_, WeatherApiData>( + "SELECT id, user_id, location_id, api_type, data, fetched_at FROM weather_api_data WHERE user_id = ? ORDER BY fetched_at DESC LIMIT ?" + ) + .bind(user_id) + .bind(limit) + .fetch_all(self.db) + .await + } + + #[allow(dead_code)] + pub async fn delete_old_weather_data(&self, days_old: i64) -> Result { + let result = sqlx::query( + "DELETE FROM weather_api_data WHERE fetched_at < datetime('now', '-{} days')", + ) + .bind(days_old) + .execute(self.db) + .await?; + Ok(result.rows_affected()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::locations::LocationRepository; + use crate::users::{UserRepository, UserRole}; + use sqlx::{Executor, SqlitePool}; + + async fn setup_db() -> SqlitePool { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + pool.execute( + "CREATE TABLE users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL UNIQUE, + role TEXT NOT NULL DEFAULT 'user' + );", + ) + .await + .unwrap(); + pool.execute( + "CREATE TABLE locations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + latitude REAL NOT NULL, + longitude REAL NOT NULL, + user_id INTEGER NOT NULL, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE NO ACTION + );", + ) + .await + .unwrap(); + pool.execute( + "CREATE TABLE weather_api_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + location_id INTEGER NOT NULL, + api_type TEXT NOT NULL DEFAULT 'openweathermap', + data TEXT NOT NULL, + fetched_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE, + FOREIGN KEY(location_id) REFERENCES locations(id) ON DELETE CASCADE + );", + ) + .await + .unwrap(); + pool + } + + async fn create_user(pool: &SqlitePool) -> i64 { + let repo = UserRepository { db: pool }; + let user = repo + .create_user(Some("test_user".to_string()), Some(UserRole::User)) + .await + .unwrap(); + user.id + } + + async fn create_location(pool: &SqlitePool, user_id: i64) -> i64 { + let repo = LocationRepository { db: pool }; + let location = repo + .create_location(60.1699, 24.9384, user_id) // Helsinki coordinates + .await + .unwrap(); + location.id + } + + #[tokio::test] + async fn test_create_and_get_weather_data() { + let pool = setup_db().await; + let user_id = create_user(&pool).await; + let location_id = create_location(&pool, user_id).await; + + let repo = WeatherApiDataRepository { db: &pool }; + let test_data = r#"{"temp": 20.5, "humidity": 65}"#.to_string(); + + let weather_data = repo + .create_weather_data( + user_id, + location_id, + "openweathermap".to_string(), + test_data.clone(), + ) + .await + .unwrap(); + + assert_eq!(weather_data.user_id, user_id); + assert_eq!(weather_data.location_id, location_id); + assert_eq!(weather_data.api_type, "openweathermap"); + assert_eq!(weather_data.data, test_data); + + let latest = repo + .get_latest_weather_data(user_id, location_id, "openweathermap") + .await + .unwrap() + .unwrap(); + + assert_eq!(latest.id, weather_data.id); + assert_eq!(latest.data, test_data); + } + + #[tokio::test] + async fn test_list_weather_data_by_user() { + let pool = setup_db().await; + let user_id = create_user(&pool).await; + let location_id = create_location(&pool, user_id).await; + + let repo = WeatherApiDataRepository { db: &pool }; + let test_data1 = r#"{"temp": 20.5}"#.to_string(); + let test_data2 = r#"{"temp": 22.0}"#.to_string(); + + repo.create_weather_data( + user_id, + location_id, + "openweathermap".to_string(), + test_data1.clone(), + ) + .await + .unwrap(); + repo.create_weather_data( + user_id, + location_id, + "openweathermap".to_string(), + test_data2.clone(), + ) + .await + .unwrap(); + + let data_list = repo + .list_weather_data_by_user(user_id, Some(10)) + .await + .unwrap(); + assert_eq!(data_list.len(), 2); + // Check that both data entries are present (order may vary) + let data_values: Vec<&str> = data_list.iter().map(|d| d.data.as_str()).collect(); + assert!(data_values.contains(&test_data1.as_str())); + assert!(data_values.contains(&test_data2.as_str())); + } +} diff --git a/src/weather_poller.rs b/src/weather_poller.rs index ea211b7..76f3fda 100644 --- a/src/weather_poller.rs +++ b/src/weather_poller.rs @@ -1 +1,502 @@ -// Remove all unused imports at the top of the file. +use crate::{ + notifications::NtfySettingsRepository, + users::{UserRepository, UserWithLocation}, + weather_api_data::WeatherApiDataRepository, + weather_thresholds::WeatherThresholdRepository, +}; +use anyhow::Result; +use chrono::{DateTime, Utc}; +use reqwest::Client; +use serde_json::Value; +use sqlx::SqlitePool; +use std::env; +use std::sync::Arc; +use tokio::time::{Duration, sleep}; +use tracing::{error, info, warn}; + +const API_URL: &str = "https://api.openweathermap.org/data/2.5/forecast"; +const ALERT_WINDOW_HOURS: i64 = 24; + +pub struct WeatherPoller { + pool: Arc, + client: Client, +} + +impl WeatherPoller { + pub fn new(pool: Arc) -> Self { + Self { + pool, + client: Client::new(), + } + } + + /// Main function to check weather for all users with locations + pub async fn check_all(&self) -> Result<()> { + info!("🔄 Checking weather forecast for all users..."); + + let user_repo = UserRepository { db: &self.pool }; + let users_with_locations = user_repo.list_users_with_locations().await?; + + if users_with_locations.is_empty() { + info!("No users with locations found"); + return Ok(()); + } + + info!("Found {} users with locations", users_with_locations.len()); + + // Spawn async tasks for each user-location combination + let mut handles = Vec::new(); + for user_with_location in users_with_locations { + let poller = self.clone(); + let handle = + tokio::spawn(async move { poller.check_user_weather(user_with_location).await }); + handles.push(handle); + } + + // Wait for all tasks to complete + for handle in handles { + if let Err(e) = handle.await { + error!("Task failed: {}", e); + } + } + + Ok(()) + } + + /// Check weather for a specific user and location + async fn check_user_weather(&self, user_with_location: UserWithLocation) -> Result<()> { + let user_id = user_with_location.id; + let location_id = user_with_location.location_id; + let lat = user_with_location.latitude; + let lon = user_with_location.longitude; + + info!( + "Checking weather for user {} at location {}", + user_id, location_id + ); + + // Round coordinates (similar to Elixir project) + let (rounded_lat, rounded_lon) = self.round_coordinates(lat, lon); + + match self.fetch_forecast(rounded_lat, rounded_lon).await { + Ok(forecast_data) => { + // Store the weather data + let api_data_repo = WeatherApiDataRepository { db: &self.pool }; + let json_data = serde_json::to_string(&forecast_data)?; + + api_data_repo + .create_weather_data( + user_id, + location_id, + "openweathermap".to_string(), + json_data, + ) + .await?; + + info!( + "Weather data stored for user {} location {}", + user_id, location_id + ); + + // Check thresholds after storing data + self.check_thresholds(user_id, location_id, &forecast_data) + .await?; + } + Err(e) => { + error!("❌ Error fetching forecast for user {}: {}", user_id, e); + } + } + + Ok(()) + } + + /// Round coordinates to reduce API calls for nearby locations + fn round_coordinates(&self, lat: f64, lon: f64) -> (f64, f64) { + // Round to 2 decimal places (approximately 1km precision) + let rounded_lat = (lat * 100.0).round() / 100.0; + let rounded_lon = (lon * 100.0).round() / 100.0; + (rounded_lat, rounded_lon) + } + + /// Fetch weather forecast from OpenWeatherMap API + async fn fetch_forecast(&self, lat: f64, lon: f64) -> Result { + let api_key = env::var("OPENWEATHERMAP_API_KEY") + .map_err(|_| anyhow::anyhow!("OPENWEATHERMAP_API_KEY environment variable not set"))?; + + let response = self + .client + .get(API_URL) + .query(&[ + ("lat", lat.to_string()), + ("lon", lon.to_string()), + ("units", "metric".to_string()), + ("appid", api_key), + ]) + .send() + .await?; + + if !response.status().is_success() { + let status = response.status(); + let body = response.text().await?; + return Err(anyhow::anyhow!("API request failed: {} - {}", status, body)); + } + + let data: Value = response.json().await?; + Ok(data) + } + + /// Check weather thresholds and send notifications if needed + async fn check_thresholds( + &self, + user_id: i64, + _location_id: i64, + forecast_data: &Value, + ) -> Result<()> { + let threshold_repo = WeatherThresholdRepository { db: &self.pool }; + let thresholds = threshold_repo.list_thresholds(user_id).await?; + + if thresholds.is_empty() { + return Ok(()); + } + + // Get the forecast list from the API response + let forecast_list = forecast_data + .get("list") + .and_then(|v| v.as_array()) + .ok_or_else(|| anyhow::anyhow!("Invalid forecast data structure"))?; + + let now = Utc::now(); + + // Check each forecast entry within the alert window + for forecast_entry in forecast_list { + if let Some(timestamp) = forecast_entry.get("dt").and_then(|v| v.as_i64()) { + let forecast_time = DateTime::from_timestamp(timestamp, 0) + .ok_or_else(|| anyhow::anyhow!("Invalid timestamp"))?; + + let hours_diff = (forecast_time - now).num_hours(); + if hours_diff > ALERT_WINDOW_HOURS { + continue; // Skip forecasts beyond our alert window + } + + // Check if any thresholds are exceeded + for threshold in &thresholds { + if !threshold.enabled { + continue; + } + + if self.is_threshold_exceeded(forecast_entry, threshold)? { + info!( + "🚨 Threshold exceeded for user {}: {} {} {}", + user_id, + threshold.condition_type, + threshold.operator, + threshold.threshold_value + ); + + // Send notification + self.send_notification(user_id, threshold, forecast_entry) + .await?; + + // Only send one notification per check to avoid spam + return Ok(()); + } + } + } + } + + Ok(()) + } + + /// Check if a specific threshold is exceeded + fn is_threshold_exceeded( + &self, + forecast_entry: &Value, + threshold: &crate::weather_thresholds::WeatherThreshold, + ) -> Result { + let value = match threshold.condition_type.as_str() { + "temperature" => forecast_entry + .get("main") + .and_then(|m| m.get("temp")) + .and_then(|t| t.as_f64()) + .ok_or_else(|| anyhow::anyhow!("Temperature not found in forecast"))?, + "rain" => forecast_entry + .get("rain") + .and_then(|r| r.get("3h")) + .and_then(|r| r.as_f64()) + .unwrap_or(0.0), + "wind_speed" => { + let wind_speed = forecast_entry + .get("wind") + .and_then(|w| w.get("speed")) + .and_then(|s| s.as_f64()) + .ok_or_else(|| anyhow::anyhow!("Wind speed not found in forecast"))?; + // Convert m/s to km/h + wind_speed * 3.6 + } + _ => { + warn!("Unknown condition type: {}", threshold.condition_type); + return Ok(false); + } + }; + + let threshold_value = threshold.threshold_value; + let exceeded = match threshold.operator.as_str() { + ">" => value > threshold_value, + ">=" => value >= threshold_value, + "<" => value < threshold_value, + "<=" => value <= threshold_value, + "==" => (value - threshold_value).abs() < 0.1, // Small tolerance for floating point + _ => { + warn!("Unknown operator: {}", threshold.operator); + false + } + }; + + if exceeded { + info!( + "🚨 {} threshold exceeded: {} {} {} (value: {})", + threshold.condition_type, value, threshold.operator, threshold_value, value + ); + } + + Ok(exceeded) + } + + /// Send notification to user + async fn send_notification( + &self, + user_id: i64, + threshold: &crate::weather_thresholds::WeatherThreshold, + _forecast_entry: &Value, + ) -> Result<()> { + let ntfy_repo = NtfySettingsRepository { db: &self.pool }; + + if let Ok(Some(ntfy_settings)) = ntfy_repo.get_by_user(user_id).await { + if ntfy_settings.enabled { + // Create notification message + let title = format!( + "Weather Alert: {} {} {}", + threshold.condition_type, threshold.operator, threshold.threshold_value + ); + + let message = format!( + "Weather threshold exceeded for condition: {} {} {}", + threshold.condition_type, threshold.operator, threshold.threshold_value + ); + + // Send NTFY notification + self.send_ntfy_notification(&ntfy_settings, &title, &message) + .await?; + } + } + + Ok(()) + } + + /// Send NTFY notification + async fn send_ntfy_notification( + &self, + ntfy_settings: &crate::notifications::NtfySettings, + title: &str, + message: &str, + ) -> Result<()> { + let url = format!("{}/{}", ntfy_settings.server_url, ntfy_settings.topic); + + let response = self + .client + .post(&url) + .header("Title", title) + .header("Priority", ntfy_settings.priority.to_string()) + .body(message.to_string()) + .send() + .await?; + + if response.status().is_success() { + info!("NTFY notification sent successfully"); + } else { + error!("Failed to send NTFY notification: {}", response.status()); + } + + Ok(()) + } +} + +impl Clone for WeatherPoller { + fn clone(&self) -> Self { + Self { + pool: Arc::clone(&self.pool), + client: Client::new(), + } + } +} + +/// Background task scheduler for weather polling +pub struct WeatherScheduler { + poller: WeatherPoller, +} + +impl WeatherScheduler { + pub fn new(pool: Arc) -> Self { + Self { + poller: WeatherPoller::new(pool), + } + } + + /// Start the scheduler to run weather checks hourly + pub async fn start(&self) -> Result<()> { + info!("Starting weather scheduler..."); + + loop { + info!("Running scheduled weather check..."); + + if let Err(e) = self.poller.check_all().await { + error!("Weather check failed: {}", e); + } + + // Wait for 1 hour before next check + sleep(Duration::from_secs(3600)).await; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sqlx::{Executor, SqlitePool}; + + #[allow(dead_code)] + async fn setup_db() -> SqlitePool { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + pool.execute( + "CREATE TABLE users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL UNIQUE, + role TEXT NOT NULL DEFAULT 'user' + );", + ) + .await + .unwrap(); + pool.execute( + "CREATE TABLE locations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + latitude REAL NOT NULL, + longitude REAL NOT NULL, + user_id INTEGER NOT NULL, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE NO ACTION + );", + ) + .await + .unwrap(); + pool.execute( + "CREATE TABLE weather_api_data ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + location_id INTEGER NOT NULL, + api_type TEXT NOT NULL DEFAULT 'openweathermap', + data TEXT NOT NULL, + fetched_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE, + FOREIGN KEY(location_id) REFERENCES locations(id) ON DELETE CASCADE + );", + ) + .await + .unwrap(); + pool.execute( + "CREATE TABLE weather_thresholds ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + condition_type TEXT NOT NULL, + threshold_value REAL NOT NULL, + operator TEXT NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT 1, + description TEXT, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + );", + ) + .await + .unwrap(); + pool.execute( + "CREATE TABLE user_notification_settings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + ntfy_enabled BOOLEAN NOT NULL DEFAULT 0, + ntfy_topic TEXT, + ntfy_server_url TEXT, + ntfy_priority INTEGER NOT NULL DEFAULT 3, + ntfy_title_template TEXT, + ntfy_message_template TEXT, + smtp_enabled BOOLEAN NOT NULL DEFAULT 0, + smtp_email TEXT, + smtp_server TEXT, + smtp_port INTEGER, + smtp_username TEXT, + smtp_password TEXT, + smtp_use_tls BOOLEAN NOT NULL DEFAULT 1, + smtp_from_email TEXT, + smtp_from_name TEXT, + smtp_subject_template TEXT, + smtp_body_template TEXT, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + );", + ) + .await + .unwrap(); + pool + } + + #[tokio::test] + async fn test_round_coordinates() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + let poller = WeatherPoller::new(Arc::new(pool)); + + let (lat, lon) = poller.round_coordinates(60.1699, 24.9384); + assert_eq!(lat, 60.17); + assert_eq!(lon, 24.94); + + let (lat, lon) = poller.round_coordinates(60.1649, 24.9334); + assert_eq!(lat, 60.16); + assert_eq!(lon, 24.93); + } + + #[tokio::test] + async fn test_is_threshold_exceeded() { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + let poller = WeatherPoller::new(Arc::new(pool)); + + let forecast_entry = serde_json::json!({ + "main": {"temp": 25.0}, + "wind": {"speed": 10.0}, + "rain": {"3h": 30.0} + }); + + let threshold = crate::weather_thresholds::WeatherThreshold { + id: 1, + user_id: 1, + condition_type: "temperature".to_string(), + threshold_value: 20.0, + operator: ">".to_string(), + enabled: true, + description: None, + }; + + let exceeded = poller + .is_threshold_exceeded(&forecast_entry, &threshold) + .unwrap(); + assert!(exceeded); + + let threshold = crate::weather_thresholds::WeatherThreshold { + id: 1, + user_id: 1, + condition_type: "temperature".to_string(), + threshold_value: 30.0, + operator: ">".to_string(), + enabled: true, + description: None, + }; + + let exceeded = poller + .is_threshold_exceeded(&forecast_entry, &threshold) + .unwrap(); + assert!(!exceeded); + } +} -- cgit v1.2.3