summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/config.rs160
-rw-r--r--src/main.rs94
-rw-r--r--src/users.rs21
-rw-r--r--src/weather_api_data.rs227
-rw-r--r--src/weather_poller.rs503
5 files changed, 982 insertions, 23 deletions
diff --git a/src/config.rs b/src/config.rs
new file mode 100644
index 0000000..186ef82
--- /dev/null
+++ b/src/config.rs
@@ -0,0 +1,160 @@
+use std::env;
+use std::path::PathBuf;
+
+#[derive(Debug, Clone)]
+pub struct Config {
+ pub database_url: String,
+ #[allow(dead_code)]
+ pub openweathermap_api_key: Option<String>,
+ pub admin_token: Option<String>,
+ pub server_port: u16,
+ pub server_host: String,
+ pub log_level: String,
+}
+
+impl Config {
+ pub fn from_env() -> Self {
+ let database_url = Self::get_database_url();
+ let openweathermap_api_key = env::var("OPENWEATHERMAP_API_KEY").ok();
+ let admin_token = env::var("ADMIN_TOKEN").ok();
+ let server_port = env::var("PORT")
+ .unwrap_or_else(|_| "4000".to_string())
+ .parse()
+ .unwrap_or(4000);
+ let server_host = env::var("HOST").unwrap_or_else(|_| "0.0.0.0".to_string());
+ let log_level = env::var("LOG_LEVEL").unwrap_or_else(|_| "info".to_string());
+
+ Self {
+ database_url,
+ openweathermap_api_key,
+ admin_token,
+ server_port,
+ server_host,
+ log_level,
+ }
+ }
+
+ fn get_database_url() -> String {
+ // First, check if DATABASE_URL is explicitly set
+ if let Ok(url) = env::var("DATABASE_URL") {
+ return url;
+ }
+
+ // If not set, construct the path using XDG_DATA_HOME or fallback
+ let data_dir = if let Ok(xdg_data_home) = env::var("XDG_DATA_HOME") {
+ PathBuf::from(xdg_data_home)
+ } else {
+ // Fallback to ~/.local/share
+ let home = env::var("HOME")
+ .unwrap_or_else(|_| env::var("USERPROFILE").unwrap_or_else(|_| "/tmp".to_string()));
+ PathBuf::from(home).join(".local").join("share")
+ };
+
+ let db_path = data_dir.join("silmataivas").join("silmataivas.db");
+
+ // Ensure the directory exists
+ if let Some(parent) = db_path.parent() {
+ let _ = std::fs::create_dir_all(parent);
+ }
+
+ format!("sqlite://{}", db_path.display())
+ }
+
+ #[allow(dead_code)]
+ pub fn validate(&self) -> Result<(), String> {
+ if self.openweathermap_api_key.is_none() {
+ return Err("OPENWEATHERMAP_API_KEY environment variable is required".to_string());
+ }
+ Ok(())
+ }
+
+ #[allow(dead_code)]
+ pub fn validate_optional(&self) -> Result<(), String> {
+ // For optional validation (e.g., during development)
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use std::env;
+ use std::sync::Once;
+
+ static INIT: Once = Once::new();
+
+ fn setup() {
+ INIT.call_once(|| {
+ // Clear all environment variables that might interfere with tests
+ unsafe {
+ env::remove_var("DATABASE_URL");
+ env::remove_var("XDG_DATA_HOME");
+ env::remove_var("PORT");
+ env::remove_var("HOST");
+ env::remove_var("LOG_LEVEL");
+ }
+ });
+
+ // Clear environment variables before each test
+ unsafe {
+ env::remove_var("DATABASE_URL");
+ env::remove_var("XDG_DATA_HOME");
+ env::remove_var("PORT");
+ env::remove_var("HOST");
+ env::remove_var("LOG_LEVEL");
+ }
+ }
+
+ #[test]
+ fn test_database_url_fallback() {
+ setup();
+
+ let home = env::var("HOME").unwrap_or_else(|_| "/tmp".to_string());
+ let expected_path = format!("sqlite://{home}/.local/share/silmataivas/silmataivas.db");
+
+ let config = Config::from_env();
+ assert_eq!(config.database_url, expected_path);
+ }
+
+ #[test]
+ fn test_database_url_xdg_data_home() {
+ setup();
+
+ unsafe {
+ env::set_var("XDG_DATA_HOME", "/custom/data/path");
+ }
+
+ let expected_path = "sqlite:///custom/data/path/silmataivas/silmataivas.db";
+
+ let config = Config::from_env();
+ assert_eq!(config.database_url, expected_path);
+ }
+
+ #[test]
+ fn test_database_url_explicit() {
+ setup();
+
+ unsafe {
+ env::set_var("DATABASE_URL", "sqlite:///explicit/path.db");
+ }
+
+ let config = Config::from_env();
+ assert_eq!(config.database_url, "sqlite:///explicit/path.db");
+ }
+
+ #[test]
+ fn test_server_config() {
+ setup();
+
+ unsafe {
+ env::set_var("PORT", "8080");
+ env::set_var("HOST", "127.0.0.1");
+ env::set_var("LOG_LEVEL", "debug");
+ }
+
+ let config = Config::from_env();
+ assert_eq!(config.server_port, 8080);
+ assert_eq!(config.server_host, "127.0.0.1");
+ assert_eq!(config.log_level, "debug");
+ }
+}
diff --git a/src/main.rs b/src/main.rs
index 4e89234..9f97e37 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -6,10 +6,8 @@ use axum::{Router, routing::get};
use clap::{Parser, Subcommand};
use serde_json::json;
use sqlx::SqlitePool;
-use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
-use tokio::fs;
use tracing::{error, info};
use tracing_subscriber::{EnvFilter, FmtSubscriber};
use utoipa::OpenApi;
@@ -17,10 +15,12 @@ use utoipa_swagger_ui::SwaggerUi;
use uuid::Uuid;
mod auth;
+mod config;
mod health;
mod locations;
mod notifications;
mod users;
+mod weather_api_data;
mod weather_poller;
mod weather_thresholds;
@@ -918,31 +918,42 @@ enum Commands {
Server,
/// Create a new user with optional UUID
CreateUser { uuid: Option<String> },
+ /// Manually trigger weather check for testing
+ CheckWeather,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cli = Cli::parse();
- // Set up logging based on verbosity
- let filter = match cli.verbose {
- 0 => "warn",
- 1 => "info",
- 2 => "debug",
- _ => "trace",
+
+ // Load configuration
+ let config = config::Config::from_env();
+
+ // Set up logging based on verbosity or config
+ let filter = if cli.verbose > 0 {
+ match cli.verbose {
+ 1 => "info",
+ 2 => "debug",
+ _ => "trace",
+ }
+ } else {
+ &config.log_level
};
+
let subscriber = FmtSubscriber::builder()
.with_env_filter(EnvFilter::new(filter))
.finish();
tracing::subscriber::set_global_default(subscriber).expect("setting default subscriber failed");
+
+ info!(
+ "Starting Silmataivas with database: {}",
+ config.database_url
+ );
+
match cli.command.unwrap_or(Commands::Server) {
Commands::Server => {
- // Set up database path
- let db_path = env::var("DATABASE_URL")
- .unwrap_or_else(|_| "sqlite://./data/silmataivas.db".to_string());
- // Ensure data directory exists
- let _ = fs::create_dir_all("./data").await;
- // Connect to SQLite
- let pool = SqlitePool::connect(&db_path)
+ // Connect to database
+ let pool = SqlitePool::connect(&config.database_url)
.await
.expect("Failed to connect to DB");
@@ -951,8 +962,9 @@ async fn main() -> anyhow::Result<()> {
let repo = users::UserRepository { db: &pool };
match repo.any_admin_exists().await {
Ok(false) => {
- let admin_token =
- env::var("ADMIN_TOKEN").unwrap_or_else(|_| Uuid::new_v4().to_string());
+ let admin_token = config
+ .admin_token
+ .unwrap_or_else(|| Uuid::new_v4().to_string());
match repo
.create_user(Some(admin_token.clone()), Some(users::UserRole::Admin))
.await
@@ -970,17 +982,42 @@ async fn main() -> anyhow::Result<()> {
}
}
- let app = app_with_state(Arc::new(pool));
- let addr = SocketAddr::from(([0, 0, 0, 0], 4000));
+ let app = app_with_state(Arc::new(pool.clone()));
+ let addr = SocketAddr::from((
+ config
+ .server_host
+ .parse::<std::net::IpAddr>()
+ .unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0))),
+ config.server_port,
+ ));
let listener = tokio::net::TcpListener::bind(addr)
.await
.expect("Failed to bind address");
info!("Listening on {}", listener.local_addr().unwrap());
- axum::serve(listener, app).await.unwrap();
+
+ // Start the weather scheduler in the background
+ let scheduler = crate::weather_poller::WeatherScheduler::new(Arc::new(pool));
+ let scheduler_handle = tokio::spawn(async move {
+ if let Err(e) = scheduler.start().await {
+ error!("Weather scheduler failed: {}", e);
+ }
+ });
+
+ // Start the web server
+ let server_handle = axum::serve(listener, app);
+
+ // Wait for either the server or scheduler to fail
+ tokio::select! {
+ _ = server_handle => {
+ error!("Web server stopped");
+ }
+ _ = scheduler_handle => {
+ error!("Weather scheduler stopped");
+ }
+ }
}
Commands::CreateUser { uuid } => {
- let db_url = env::var("DATABASE_URL").expect("DATABASE_URL must be set");
- let pool = SqlitePool::connect(&db_url).await?;
+ let pool = SqlitePool::connect(&config.database_url).await?;
let repo = crate::users::UserRepository { db: &pool };
let user_id = uuid.unwrap_or_else(|| Uuid::new_v4().to_string());
let user = repo
@@ -1037,6 +1074,19 @@ async fn main() -> anyhow::Result<()> {
info!("User {} created", user_id);
}
+ Commands::CheckWeather => {
+ let pool = SqlitePool::connect(&config.database_url)
+ .await
+ .expect("Failed to connect to DB");
+
+ let poller = crate::weather_poller::WeatherPoller::new(Arc::new(pool));
+ info!("Manually triggering weather check...");
+
+ match poller.check_all().await {
+ Ok(()) => info!("Weather check completed successfully"),
+ Err(e) => error!("Weather check failed: {}", e),
+ }
+ }
}
Ok(())
}
diff --git a/src/users.rs b/src/users.rs
index 43f190d..dad24bc 100644
--- a/src/users.rs
+++ b/src/users.rs
@@ -20,6 +20,16 @@ pub enum UserRole {
Admin,
}
+#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq, ToSchema)]
+pub struct UserWithLocation {
+ pub id: i64,
+ pub user_id: String,
+ pub role: UserRole,
+ pub location_id: i64,
+ pub latitude: f64,
+ pub longitude: f64,
+}
+
pub struct UserRepository<'a> {
pub db: &'a sqlx::SqlitePool,
}
@@ -79,6 +89,17 @@ impl<'a> UserRepository<'a> {
Ok(())
}
+ pub async fn list_users_with_locations(&self) -> Result<Vec<UserWithLocation>, sqlx::Error> {
+ sqlx::query_as::<_, UserWithLocation>(
+ "SELECT u.id, u.user_id, u.role, l.id as location_id, l.latitude, l.longitude
+ FROM users u
+ LEFT JOIN locations l ON u.id = l.user_id
+ WHERE l.id IS NOT NULL",
+ )
+ .fetch_all(self.db)
+ .await
+ }
+
pub async fn any_admin_exists(&self) -> Result<bool, sqlx::Error> {
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM users WHERE role = ?")
.bind(UserRole::Admin)
diff --git a/src/weather_api_data.rs b/src/weather_api_data.rs
new file mode 100644
index 0000000..14a1cac
--- /dev/null
+++ b/src/weather_api_data.rs
@@ -0,0 +1,227 @@
+use chrono::{DateTime, Utc};
+use serde::{Deserialize, Serialize};
+use sqlx::FromRow;
+use utoipa::ToSchema;
+
+#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq, ToSchema)]
+pub struct WeatherApiData {
+ pub id: i64,
+ pub user_id: i64,
+ pub location_id: i64,
+ pub api_type: String,
+ pub data: String, // JSON data from the API
+ #[sqlx(rename = "fetched_at")]
+ pub fetched_at: String, // Store as string, convert when needed
+}
+
+impl WeatherApiData {
+ #[allow(dead_code)]
+ pub fn fetched_at_datetime(&self) -> Result<DateTime<Utc>, chrono::ParseError> {
+ DateTime::parse_from_rfc3339(&self.fetched_at).map(|dt| dt.with_timezone(&Utc))
+ }
+}
+
+pub struct WeatherApiDataRepository<'a> {
+ pub db: &'a sqlx::SqlitePool,
+}
+
+impl<'a> WeatherApiDataRepository<'a> {
+ pub async fn create_weather_data(
+ &self,
+ user_id: i64,
+ location_id: i64,
+ api_type: String,
+ data: String,
+ ) -> Result<WeatherApiData, sqlx::Error> {
+ sqlx::query_as::<_, WeatherApiData>(
+ "INSERT INTO weather_api_data (user_id, location_id, api_type, data) VALUES (?, ?, ?, ?) RETURNING id, user_id, location_id, api_type, data, fetched_at"
+ )
+ .bind(user_id)
+ .bind(location_id)
+ .bind(api_type)
+ .bind(data)
+ .fetch_one(self.db)
+ .await
+ }
+
+ #[allow(dead_code)]
+ pub async fn get_latest_weather_data(
+ &self,
+ user_id: i64,
+ location_id: i64,
+ api_type: &str,
+ ) -> Result<Option<WeatherApiData>, sqlx::Error> {
+ sqlx::query_as::<_, WeatherApiData>(
+ "SELECT id, user_id, location_id, api_type, data, fetched_at FROM weather_api_data WHERE user_id = ? AND location_id = ? AND api_type = ? ORDER BY fetched_at DESC LIMIT 1"
+ )
+ .bind(user_id)
+ .bind(location_id)
+ .bind(api_type)
+ .fetch_optional(self.db)
+ .await
+ }
+
+ #[allow(dead_code)]
+ pub async fn list_weather_data_by_user(
+ &self,
+ user_id: i64,
+ limit: Option<i64>,
+ ) -> Result<Vec<WeatherApiData>, sqlx::Error> {
+ let limit = limit.unwrap_or(100);
+ sqlx::query_as::<_, WeatherApiData>(
+ "SELECT id, user_id, location_id, api_type, data, fetched_at FROM weather_api_data WHERE user_id = ? ORDER BY fetched_at DESC LIMIT ?"
+ )
+ .bind(user_id)
+ .bind(limit)
+ .fetch_all(self.db)
+ .await
+ }
+
+ #[allow(dead_code)]
+ pub async fn delete_old_weather_data(&self, days_old: i64) -> Result<u64, sqlx::Error> {
+ let result = sqlx::query(
+ "DELETE FROM weather_api_data WHERE fetched_at < datetime('now', '-{} days')",
+ )
+ .bind(days_old)
+ .execute(self.db)
+ .await?;
+ Ok(result.rows_affected())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::locations::LocationRepository;
+ use crate::users::{UserRepository, UserRole};
+ use sqlx::{Executor, SqlitePool};
+
+ async fn setup_db() -> SqlitePool {
+ let pool = SqlitePool::connect(":memory:").await.unwrap();
+ pool.execute(
+ "CREATE TABLE users (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id TEXT NOT NULL UNIQUE,
+ role TEXT NOT NULL DEFAULT 'user'
+ );",
+ )
+ .await
+ .unwrap();
+ pool.execute(
+ "CREATE TABLE locations (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ latitude REAL NOT NULL,
+ longitude REAL NOT NULL,
+ user_id INTEGER NOT NULL,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE NO ACTION
+ );",
+ )
+ .await
+ .unwrap();
+ pool.execute(
+ "CREATE TABLE weather_api_data (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ location_id INTEGER NOT NULL,
+ api_type TEXT NOT NULL DEFAULT 'openweathermap',
+ data TEXT NOT NULL,
+ fetched_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE,
+ FOREIGN KEY(location_id) REFERENCES locations(id) ON DELETE CASCADE
+ );",
+ )
+ .await
+ .unwrap();
+ pool
+ }
+
+ async fn create_user(pool: &SqlitePool) -> i64 {
+ let repo = UserRepository { db: pool };
+ let user = repo
+ .create_user(Some("test_user".to_string()), Some(UserRole::User))
+ .await
+ .unwrap();
+ user.id
+ }
+
+ async fn create_location(pool: &SqlitePool, user_id: i64) -> i64 {
+ let repo = LocationRepository { db: pool };
+ let location = repo
+ .create_location(60.1699, 24.9384, user_id) // Helsinki coordinates
+ .await
+ .unwrap();
+ location.id
+ }
+
+ #[tokio::test]
+ async fn test_create_and_get_weather_data() {
+ let pool = setup_db().await;
+ let user_id = create_user(&pool).await;
+ let location_id = create_location(&pool, user_id).await;
+
+ let repo = WeatherApiDataRepository { db: &pool };
+ let test_data = r#"{"temp": 20.5, "humidity": 65}"#.to_string();
+
+ let weather_data = repo
+ .create_weather_data(
+ user_id,
+ location_id,
+ "openweathermap".to_string(),
+ test_data.clone(),
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(weather_data.user_id, user_id);
+ assert_eq!(weather_data.location_id, location_id);
+ assert_eq!(weather_data.api_type, "openweathermap");
+ assert_eq!(weather_data.data, test_data);
+
+ let latest = repo
+ .get_latest_weather_data(user_id, location_id, "openweathermap")
+ .await
+ .unwrap()
+ .unwrap();
+
+ assert_eq!(latest.id, weather_data.id);
+ assert_eq!(latest.data, test_data);
+ }
+
+ #[tokio::test]
+ async fn test_list_weather_data_by_user() {
+ let pool = setup_db().await;
+ let user_id = create_user(&pool).await;
+ let location_id = create_location(&pool, user_id).await;
+
+ let repo = WeatherApiDataRepository { db: &pool };
+ let test_data1 = r#"{"temp": 20.5}"#.to_string();
+ let test_data2 = r#"{"temp": 22.0}"#.to_string();
+
+ repo.create_weather_data(
+ user_id,
+ location_id,
+ "openweathermap".to_string(),
+ test_data1.clone(),
+ )
+ .await
+ .unwrap();
+ repo.create_weather_data(
+ user_id,
+ location_id,
+ "openweathermap".to_string(),
+ test_data2.clone(),
+ )
+ .await
+ .unwrap();
+
+ let data_list = repo
+ .list_weather_data_by_user(user_id, Some(10))
+ .await
+ .unwrap();
+ assert_eq!(data_list.len(), 2);
+ // Check that both data entries are present (order may vary)
+ let data_values: Vec<&str> = data_list.iter().map(|d| d.data.as_str()).collect();
+ assert!(data_values.contains(&test_data1.as_str()));
+ assert!(data_values.contains(&test_data2.as_str()));
+ }
+}
diff --git a/src/weather_poller.rs b/src/weather_poller.rs
index ea211b7..76f3fda 100644
--- a/src/weather_poller.rs
+++ b/src/weather_poller.rs
@@ -1 +1,502 @@
-// Remove all unused imports at the top of the file.
+use crate::{
+ notifications::NtfySettingsRepository,
+ users::{UserRepository, UserWithLocation},
+ weather_api_data::WeatherApiDataRepository,
+ weather_thresholds::WeatherThresholdRepository,
+};
+use anyhow::Result;
+use chrono::{DateTime, Utc};
+use reqwest::Client;
+use serde_json::Value;
+use sqlx::SqlitePool;
+use std::env;
+use std::sync::Arc;
+use tokio::time::{Duration, sleep};
+use tracing::{error, info, warn};
+
+const API_URL: &str = "https://api.openweathermap.org/data/2.5/forecast";
+const ALERT_WINDOW_HOURS: i64 = 24;
+
+pub struct WeatherPoller {
+ pool: Arc<SqlitePool>,
+ client: Client,
+}
+
+impl WeatherPoller {
+ pub fn new(pool: Arc<SqlitePool>) -> Self {
+ Self {
+ pool,
+ client: Client::new(),
+ }
+ }
+
+ /// Main function to check weather for all users with locations
+ pub async fn check_all(&self) -> Result<()> {
+ info!("🔄 Checking weather forecast for all users...");
+
+ let user_repo = UserRepository { db: &self.pool };
+ let users_with_locations = user_repo.list_users_with_locations().await?;
+
+ if users_with_locations.is_empty() {
+ info!("No users with locations found");
+ return Ok(());
+ }
+
+ info!("Found {} users with locations", users_with_locations.len());
+
+ // Spawn async tasks for each user-location combination
+ let mut handles = Vec::new();
+ for user_with_location in users_with_locations {
+ let poller = self.clone();
+ let handle =
+ tokio::spawn(async move { poller.check_user_weather(user_with_location).await });
+ handles.push(handle);
+ }
+
+ // Wait for all tasks to complete
+ for handle in handles {
+ if let Err(e) = handle.await {
+ error!("Task failed: {}", e);
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Check weather for a specific user and location
+ async fn check_user_weather(&self, user_with_location: UserWithLocation) -> Result<()> {
+ let user_id = user_with_location.id;
+ let location_id = user_with_location.location_id;
+ let lat = user_with_location.latitude;
+ let lon = user_with_location.longitude;
+
+ info!(
+ "Checking weather for user {} at location {}",
+ user_id, location_id
+ );
+
+ // Round coordinates (similar to Elixir project)
+ let (rounded_lat, rounded_lon) = self.round_coordinates(lat, lon);
+
+ match self.fetch_forecast(rounded_lat, rounded_lon).await {
+ Ok(forecast_data) => {
+ // Store the weather data
+ let api_data_repo = WeatherApiDataRepository { db: &self.pool };
+ let json_data = serde_json::to_string(&forecast_data)?;
+
+ api_data_repo
+ .create_weather_data(
+ user_id,
+ location_id,
+ "openweathermap".to_string(),
+ json_data,
+ )
+ .await?;
+
+ info!(
+ "Weather data stored for user {} location {}",
+ user_id, location_id
+ );
+
+ // Check thresholds after storing data
+ self.check_thresholds(user_id, location_id, &forecast_data)
+ .await?;
+ }
+ Err(e) => {
+ error!("❌ Error fetching forecast for user {}: {}", user_id, e);
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Round coordinates to reduce API calls for nearby locations
+ fn round_coordinates(&self, lat: f64, lon: f64) -> (f64, f64) {
+ // Round to 2 decimal places (approximately 1km precision)
+ let rounded_lat = (lat * 100.0).round() / 100.0;
+ let rounded_lon = (lon * 100.0).round() / 100.0;
+ (rounded_lat, rounded_lon)
+ }
+
+ /// Fetch weather forecast from OpenWeatherMap API
+ async fn fetch_forecast(&self, lat: f64, lon: f64) -> Result<Value> {
+ let api_key = env::var("OPENWEATHERMAP_API_KEY")
+ .map_err(|_| anyhow::anyhow!("OPENWEATHERMAP_API_KEY environment variable not set"))?;
+
+ let response = self
+ .client
+ .get(API_URL)
+ .query(&[
+ ("lat", lat.to_string()),
+ ("lon", lon.to_string()),
+ ("units", "metric".to_string()),
+ ("appid", api_key),
+ ])
+ .send()
+ .await?;
+
+ if !response.status().is_success() {
+ let status = response.status();
+ let body = response.text().await?;
+ return Err(anyhow::anyhow!("API request failed: {} - {}", status, body));
+ }
+
+ let data: Value = response.json().await?;
+ Ok(data)
+ }
+
+ /// Check weather thresholds and send notifications if needed
+ async fn check_thresholds(
+ &self,
+ user_id: i64,
+ _location_id: i64,
+ forecast_data: &Value,
+ ) -> Result<()> {
+ let threshold_repo = WeatherThresholdRepository { db: &self.pool };
+ let thresholds = threshold_repo.list_thresholds(user_id).await?;
+
+ if thresholds.is_empty() {
+ return Ok(());
+ }
+
+ // Get the forecast list from the API response
+ let forecast_list = forecast_data
+ .get("list")
+ .and_then(|v| v.as_array())
+ .ok_or_else(|| anyhow::anyhow!("Invalid forecast data structure"))?;
+
+ let now = Utc::now();
+
+ // Check each forecast entry within the alert window
+ for forecast_entry in forecast_list {
+ if let Some(timestamp) = forecast_entry.get("dt").and_then(|v| v.as_i64()) {
+ let forecast_time = DateTime::from_timestamp(timestamp, 0)
+ .ok_or_else(|| anyhow::anyhow!("Invalid timestamp"))?;
+
+ let hours_diff = (forecast_time - now).num_hours();
+ if hours_diff > ALERT_WINDOW_HOURS {
+ continue; // Skip forecasts beyond our alert window
+ }
+
+ // Check if any thresholds are exceeded
+ for threshold in &thresholds {
+ if !threshold.enabled {
+ continue;
+ }
+
+ if self.is_threshold_exceeded(forecast_entry, threshold)? {
+ info!(
+ "🚨 Threshold exceeded for user {}: {} {} {}",
+ user_id,
+ threshold.condition_type,
+ threshold.operator,
+ threshold.threshold_value
+ );
+
+ // Send notification
+ self.send_notification(user_id, threshold, forecast_entry)
+ .await?;
+
+ // Only send one notification per check to avoid spam
+ return Ok(());
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Check if a specific threshold is exceeded
+ fn is_threshold_exceeded(
+ &self,
+ forecast_entry: &Value,
+ threshold: &crate::weather_thresholds::WeatherThreshold,
+ ) -> Result<bool> {
+ let value = match threshold.condition_type.as_str() {
+ "temperature" => forecast_entry
+ .get("main")
+ .and_then(|m| m.get("temp"))
+ .and_then(|t| t.as_f64())
+ .ok_or_else(|| anyhow::anyhow!("Temperature not found in forecast"))?,
+ "rain" => forecast_entry
+ .get("rain")
+ .and_then(|r| r.get("3h"))
+ .and_then(|r| r.as_f64())
+ .unwrap_or(0.0),
+ "wind_speed" => {
+ let wind_speed = forecast_entry
+ .get("wind")
+ .and_then(|w| w.get("speed"))
+ .and_then(|s| s.as_f64())
+ .ok_or_else(|| anyhow::anyhow!("Wind speed not found in forecast"))?;
+ // Convert m/s to km/h
+ wind_speed * 3.6
+ }
+ _ => {
+ warn!("Unknown condition type: {}", threshold.condition_type);
+ return Ok(false);
+ }
+ };
+
+ let threshold_value = threshold.threshold_value;
+ let exceeded = match threshold.operator.as_str() {
+ ">" => value > threshold_value,
+ ">=" => value >= threshold_value,
+ "<" => value < threshold_value,
+ "<=" => value <= threshold_value,
+ "==" => (value - threshold_value).abs() < 0.1, // Small tolerance for floating point
+ _ => {
+ warn!("Unknown operator: {}", threshold.operator);
+ false
+ }
+ };
+
+ if exceeded {
+ info!(
+ "🚨 {} threshold exceeded: {} {} {} (value: {})",
+ threshold.condition_type, value, threshold.operator, threshold_value, value
+ );
+ }
+
+ Ok(exceeded)
+ }
+
+ /// Send notification to user
+ async fn send_notification(
+ &self,
+ user_id: i64,
+ threshold: &crate::weather_thresholds::WeatherThreshold,
+ _forecast_entry: &Value,
+ ) -> Result<()> {
+ let ntfy_repo = NtfySettingsRepository { db: &self.pool };
+
+ if let Ok(Some(ntfy_settings)) = ntfy_repo.get_by_user(user_id).await {
+ if ntfy_settings.enabled {
+ // Create notification message
+ let title = format!(
+ "Weather Alert: {} {} {}",
+ threshold.condition_type, threshold.operator, threshold.threshold_value
+ );
+
+ let message = format!(
+ "Weather threshold exceeded for condition: {} {} {}",
+ threshold.condition_type, threshold.operator, threshold.threshold_value
+ );
+
+ // Send NTFY notification
+ self.send_ntfy_notification(&ntfy_settings, &title, &message)
+ .await?;
+ }
+ }
+
+ Ok(())
+ }
+
+ /// Send NTFY notification
+ async fn send_ntfy_notification(
+ &self,
+ ntfy_settings: &crate::notifications::NtfySettings,
+ title: &str,
+ message: &str,
+ ) -> Result<()> {
+ let url = format!("{}/{}", ntfy_settings.server_url, ntfy_settings.topic);
+
+ let response = self
+ .client
+ .post(&url)
+ .header("Title", title)
+ .header("Priority", ntfy_settings.priority.to_string())
+ .body(message.to_string())
+ .send()
+ .await?;
+
+ if response.status().is_success() {
+ info!("NTFY notification sent successfully");
+ } else {
+ error!("Failed to send NTFY notification: {}", response.status());
+ }
+
+ Ok(())
+ }
+}
+
+impl Clone for WeatherPoller {
+ fn clone(&self) -> Self {
+ Self {
+ pool: Arc::clone(&self.pool),
+ client: Client::new(),
+ }
+ }
+}
+
+/// Background task scheduler for weather polling
+pub struct WeatherScheduler {
+ poller: WeatherPoller,
+}
+
+impl WeatherScheduler {
+ pub fn new(pool: Arc<SqlitePool>) -> Self {
+ Self {
+ poller: WeatherPoller::new(pool),
+ }
+ }
+
+ /// Start the scheduler to run weather checks hourly
+ pub async fn start(&self) -> Result<()> {
+ info!("Starting weather scheduler...");
+
+ loop {
+ info!("Running scheduled weather check...");
+
+ if let Err(e) = self.poller.check_all().await {
+ error!("Weather check failed: {}", e);
+ }
+
+ // Wait for 1 hour before next check
+ sleep(Duration::from_secs(3600)).await;
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use sqlx::{Executor, SqlitePool};
+
+ #[allow(dead_code)]
+ async fn setup_db() -> SqlitePool {
+ let pool = SqlitePool::connect(":memory:").await.unwrap();
+ pool.execute(
+ "CREATE TABLE users (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id TEXT NOT NULL UNIQUE,
+ role TEXT NOT NULL DEFAULT 'user'
+ );",
+ )
+ .await
+ .unwrap();
+ pool.execute(
+ "CREATE TABLE locations (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ latitude REAL NOT NULL,
+ longitude REAL NOT NULL,
+ user_id INTEGER NOT NULL,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE NO ACTION
+ );",
+ )
+ .await
+ .unwrap();
+ pool.execute(
+ "CREATE TABLE weather_api_data (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ location_id INTEGER NOT NULL,
+ api_type TEXT NOT NULL DEFAULT 'openweathermap',
+ data TEXT NOT NULL,
+ fetched_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE,
+ FOREIGN KEY(location_id) REFERENCES locations(id) ON DELETE CASCADE
+ );",
+ )
+ .await
+ .unwrap();
+ pool.execute(
+ "CREATE TABLE weather_thresholds (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ condition_type TEXT NOT NULL,
+ threshold_value REAL NOT NULL,
+ operator TEXT NOT NULL,
+ enabled BOOLEAN NOT NULL DEFAULT 1,
+ description TEXT,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
+ );",
+ )
+ .await
+ .unwrap();
+ pool.execute(
+ "CREATE TABLE user_notification_settings (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ ntfy_enabled BOOLEAN NOT NULL DEFAULT 0,
+ ntfy_topic TEXT,
+ ntfy_server_url TEXT,
+ ntfy_priority INTEGER NOT NULL DEFAULT 3,
+ ntfy_title_template TEXT,
+ ntfy_message_template TEXT,
+ smtp_enabled BOOLEAN NOT NULL DEFAULT 0,
+ smtp_email TEXT,
+ smtp_server TEXT,
+ smtp_port INTEGER,
+ smtp_username TEXT,
+ smtp_password TEXT,
+ smtp_use_tls BOOLEAN NOT NULL DEFAULT 1,
+ smtp_from_email TEXT,
+ smtp_from_name TEXT,
+ smtp_subject_template TEXT,
+ smtp_body_template TEXT,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
+ );",
+ )
+ .await
+ .unwrap();
+ pool
+ }
+
+ #[tokio::test]
+ async fn test_round_coordinates() {
+ let pool = SqlitePool::connect(":memory:").await.unwrap();
+ let poller = WeatherPoller::new(Arc::new(pool));
+
+ let (lat, lon) = poller.round_coordinates(60.1699, 24.9384);
+ assert_eq!(lat, 60.17);
+ assert_eq!(lon, 24.94);
+
+ let (lat, lon) = poller.round_coordinates(60.1649, 24.9334);
+ assert_eq!(lat, 60.16);
+ assert_eq!(lon, 24.93);
+ }
+
+ #[tokio::test]
+ async fn test_is_threshold_exceeded() {
+ let pool = SqlitePool::connect(":memory:").await.unwrap();
+ let poller = WeatherPoller::new(Arc::new(pool));
+
+ let forecast_entry = serde_json::json!({
+ "main": {"temp": 25.0},
+ "wind": {"speed": 10.0},
+ "rain": {"3h": 30.0}
+ });
+
+ let threshold = crate::weather_thresholds::WeatherThreshold {
+ id: 1,
+ user_id: 1,
+ condition_type: "temperature".to_string(),
+ threshold_value: 20.0,
+ operator: ">".to_string(),
+ enabled: true,
+ description: None,
+ };
+
+ let exceeded = poller
+ .is_threshold_exceeded(&forecast_entry, &threshold)
+ .unwrap();
+ assert!(exceeded);
+
+ let threshold = crate::weather_thresholds::WeatherThreshold {
+ id: 1,
+ user_id: 1,
+ condition_type: "temperature".to_string(),
+ threshold_value: 30.0,
+ operator: ">".to_string(),
+ enabled: true,
+ description: None,
+ };
+
+ let exceeded = poller
+ .is_threshold_exceeded(&forecast_entry, &threshold)
+ .unwrap();
+ assert!(!exceeded);
+ }
+}