From 50ce8cb96b2b218751c2fc2a6b19372f51846acc Mon Sep 17 00:00:00 2001 From: Dawid Rycerz Date: Mon, 14 Jul 2025 19:34:59 +0300 Subject: feat: rewrite in rust --- src/health.rs | 6 + src/lib.rs | 31 +++++ src/locations.rs | 139 ++++++++++++++++++++++ src/notifications.rs | 294 ++++++++++++++++++++++++++++++++++++++++++++++ src/users.rs | 139 ++++++++++++++++++++++ src/weather_poller.rs | 192 ++++++++++++++++++++++++++++++ src/weather_thresholds.rs | 165 ++++++++++++++++++++++++++ 7 files changed, 966 insertions(+) create mode 100644 src/health.rs create mode 100644 src/lib.rs create mode 100644 src/locations.rs create mode 100644 src/notifications.rs create mode 100644 src/users.rs create mode 100644 src/weather_poller.rs create mode 100644 src/weather_thresholds.rs (limited to 'src') diff --git a/src/health.rs b/src/health.rs new file mode 100644 index 0000000..daa2f43 --- /dev/null +++ b/src/health.rs @@ -0,0 +1,6 @@ +use axum::{response::IntoResponse, Json}; +use serde_json::json; + +pub async fn health_handler() -> impl IntoResponse { + Json(json!({"status": "ok"})) +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..f51483b --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,31 @@ +use axum::{Router, routing::get}; + +pub fn app() -> Router { + Router::new() + .route("/health", get(crate::health::health_handler)) +} + +pub mod users; +pub mod locations; +pub mod weather_thresholds; +pub mod notifications; +pub mod weather_poller; +pub mod health; + +#[cfg(test)] +mod tests { + use super::*; + use axum::body::Body; + use axum::http::{Request, StatusCode}; + use tower::ServiceExt; // for `oneshot` + use axum::body::to_bytes; + + #[tokio::test] + async fn test_health_endpoint() { + let app = app(); + let response = app.oneshot(Request::builder().uri("/health").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body = to_bytes(response.into_body(), 1024).await.unwrap(); + assert_eq!(&body[..], b"{\"status\":\"ok\"}"); + } +} diff --git a/src/locations.rs b/src/locations.rs new file mode 100644 index 0000000..e5f881d --- /dev/null +++ b/src/locations.rs @@ -0,0 +1,139 @@ +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; + +#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq)] +pub struct Location { + pub id: i64, + pub latitude: f64, + pub longitude: f64, + pub user_id: i64, +} + +pub struct LocationRepository<'a> { + pub db: &'a sqlx::SqlitePool, +} + +impl<'a> LocationRepository<'a> { + pub async fn list_locations(&self) -> Result, sqlx::Error> { + sqlx::query_as::<_, Location>("SELECT id, latitude, longitude, user_id FROM locations") + .fetch_all(self.db) + .await + } + + pub async fn get_location(&self, id: i64) -> Result, sqlx::Error> { + sqlx::query_as::<_, Location>("SELECT id, latitude, longitude, user_id FROM locations WHERE id = ?") + .bind(id) + .fetch_optional(self.db) + .await + } + + pub async fn create_location(&self, latitude: f64, longitude: f64, user_id: i64) -> Result { + sqlx::query_as::<_, Location>( + "INSERT INTO locations (latitude, longitude, user_id) VALUES (?, ?, ?) RETURNING id, latitude, longitude, user_id" + ) + .bind(latitude) + .bind(longitude) + .bind(user_id) + .fetch_one(self.db) + .await + } + + pub async fn update_location(&self, id: i64, latitude: f64, longitude: f64) -> Result { + sqlx::query_as::<_, Location>( + "UPDATE locations SET latitude = ?, longitude = ? WHERE id = ? RETURNING id, latitude, longitude, user_id" + ) + .bind(latitude) + .bind(longitude) + .bind(id) + .fetch_one(self.db) + .await + } + + pub async fn delete_location(&self, id: i64) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM locations WHERE id = ?") + .bind(id) + .execute(self.db) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::users::{UserRepository, UserRole}; + use sqlx::{SqlitePool, Executor}; + use tokio; + + async fn setup_db() -> SqlitePool { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + pool.execute( + "CREATE TABLE users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL UNIQUE, + role TEXT NOT NULL DEFAULT 'user' + );" + ).await.unwrap(); + pool.execute( + "CREATE TABLE locations ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + latitude REAL NOT NULL, + longitude REAL NOT NULL, + user_id INTEGER NOT NULL, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE NO ACTION + );" + ).await.unwrap(); + pool + } + + async fn create_user(pool: &SqlitePool) -> i64 { + let repo = UserRepository { db: pool }; + let user = repo.create_user(None, Some(UserRole::User)).await.unwrap(); + user.id + } + + #[tokio::test] + async fn test_create_and_get_location() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = LocationRepository { db: &db }; + let loc = repo.create_location(60.0, 24.0, user_id).await.unwrap(); + let fetched = repo.get_location(loc.id).await.unwrap().unwrap(); + assert_eq!(fetched.latitude, 60.0); + assert_eq!(fetched.longitude, 24.0); + assert_eq!(fetched.user_id, user_id); + } + + #[tokio::test] + async fn test_update_location() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = LocationRepository { db: &db }; + let loc = repo.create_location(60.0, 24.0, user_id).await.unwrap(); + let updated = repo.update_location(loc.id, 61.0, 25.0).await.unwrap(); + assert_eq!(updated.latitude, 61.0); + assert_eq!(updated.longitude, 25.0); + } + + #[tokio::test] + async fn test_delete_location() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = LocationRepository { db: &db }; + let loc = repo.create_location(60.0, 24.0, user_id).await.unwrap(); + repo.delete_location(loc.id).await.unwrap(); + let fetched = repo.get_location(loc.id).await.unwrap(); + assert!(fetched.is_none()); + } + + #[tokio::test] + async fn test_list_locations() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = LocationRepository { db: &db }; + repo.create_location(60.0, 24.0, user_id).await.unwrap(); + repo.create_location(61.0, 25.0, user_id).await.unwrap(); + let locations = repo.list_locations().await.unwrap(); + assert_eq!(locations.len(), 2); + } +} \ No newline at end of file diff --git a/src/notifications.rs b/src/notifications.rs new file mode 100644 index 0000000..fb49e97 --- /dev/null +++ b/src/notifications.rs @@ -0,0 +1,294 @@ +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; + +#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq)] +pub struct NtfySettings { + pub id: i64, + pub user_id: i64, + pub enabled: bool, + pub topic: String, + pub server_url: String, + pub priority: i32, + pub title_template: Option, + pub message_template: Option, +} + +#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq)] +pub struct SmtpSettings { + pub id: i64, + pub user_id: i64, + pub enabled: bool, + pub email: String, + pub smtp_server: String, + pub smtp_port: i32, + pub username: Option, + pub password: Option, + pub use_tls: bool, + pub from_email: Option, + pub from_name: Option, + pub subject_template: Option, + pub body_template: Option, +} + +pub struct NtfySettingsRepository<'a> { + pub db: &'a sqlx::SqlitePool, +} + +impl<'a> NtfySettingsRepository<'a> { + pub async fn get_by_user(&self, user_id: i64) -> Result, sqlx::Error> { + sqlx::query_as::<_, NtfySettings>( + "SELECT * FROM user_ntfy_settings WHERE user_id = ?" + ) + .bind(user_id) + .fetch_optional(self.db) + .await + } + + pub async fn create(&self, user_id: i64, enabled: bool, topic: String, server_url: String, priority: i32, title_template: Option, message_template: Option) -> Result { + sqlx::query_as::<_, NtfySettings>( + "INSERT INTO user_ntfy_settings (user_id, enabled, topic, server_url, priority, title_template, message_template) VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING *" + ) + .bind(user_id) + .bind(enabled) + .bind(topic) + .bind(server_url) + .bind(priority) + .bind(title_template) + .bind(message_template) + .fetch_one(self.db) + .await + } + + pub async fn update(&self, id: i64, enabled: bool, topic: String, server_url: String, priority: i32, title_template: Option, message_template: Option) -> Result { + sqlx::query_as::<_, NtfySettings>( + "UPDATE user_ntfy_settings SET enabled = ?, topic = ?, server_url = ?, priority = ?, title_template = ?, message_template = ? WHERE id = ? RETURNING *" + ) + .bind(enabled) + .bind(topic) + .bind(server_url) + .bind(priority) + .bind(title_template) + .bind(message_template) + .bind(id) + .fetch_one(self.db) + .await + } + + pub async fn delete(&self, id: i64) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM user_ntfy_settings WHERE id = ?") + .bind(id) + .execute(self.db) + .await?; + Ok(()) + } +} + +pub struct SmtpSettingsRepository<'a> { + pub db: &'a sqlx::SqlitePool, +} + +impl<'a> SmtpSettingsRepository<'a> { + pub async fn get_by_user(&self, user_id: i64) -> Result, sqlx::Error> { + sqlx::query_as::<_, SmtpSettings>( + "SELECT * FROM user_smtp_settings WHERE user_id = ?" + ) + .bind(user_id) + .fetch_optional(self.db) + .await + } + + pub async fn create(&self, user_id: i64, enabled: bool, email: String, smtp_server: String, smtp_port: i32, username: Option, password: Option, use_tls: bool, from_email: Option, from_name: Option, subject_template: Option, body_template: Option) -> Result { + sqlx::query_as::<_, SmtpSettings>( + "INSERT INTO user_smtp_settings (user_id, enabled, email, smtp_server, smtp_port, username, password, use_tls, from_email, from_name, subject_template, body_template) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING *" + ) + .bind(user_id) + .bind(enabled) + .bind(email) + .bind(smtp_server) + .bind(smtp_port) + .bind(username) + .bind(password) + .bind(use_tls) + .bind(from_email) + .bind(from_name) + .bind(subject_template) + .bind(body_template) + .fetch_one(self.db) + .await + } + + pub async fn update(&self, id: i64, enabled: bool, email: String, smtp_server: String, smtp_port: i32, username: Option, password: Option, use_tls: bool, from_email: Option, from_name: Option, subject_template: Option, body_template: Option) -> Result { + sqlx::query_as::<_, SmtpSettings>( + "UPDATE user_smtp_settings SET enabled = ?, email = ?, smtp_server = ?, smtp_port = ?, username = ?, password = ?, use_tls = ?, from_email = ?, from_name = ?, subject_template = ?, body_template = ? WHERE id = ? RETURNING *" + ) + .bind(enabled) + .bind(email) + .bind(smtp_server) + .bind(smtp_port) + .bind(username) + .bind(password) + .bind(use_tls) + .bind(from_email) + .bind(from_name) + .bind(subject_template) + .bind(body_template) + .bind(id) + .fetch_one(self.db) + .await + } + + pub async fn delete(&self, id: i64) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM user_smtp_settings WHERE id = ?") + .bind(id) + .execute(self.db) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::users::{UserRepository, UserRole}; + use sqlx::{SqlitePool, Executor}; + use tokio; + + async fn setup_db() -> SqlitePool { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + pool.execute( + "CREATE TABLE users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL UNIQUE, + role TEXT NOT NULL DEFAULT 'user' + );" + ).await.unwrap(); + pool.execute( + "CREATE TABLE user_ntfy_settings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT 0, + topic TEXT NOT NULL, + server_url TEXT NOT NULL, + priority INTEGER NOT NULL DEFAULT 5, + title_template TEXT, + message_template TEXT, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + );" + ).await.unwrap(); + pool.execute( + "CREATE TABLE user_smtp_settings ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT 0, + email TEXT NOT NULL, + smtp_server TEXT NOT NULL, + smtp_port INTEGER NOT NULL, + username TEXT, + password TEXT, + use_tls BOOLEAN NOT NULL DEFAULT 1, + from_email TEXT, + from_name TEXT DEFAULT 'Silmätaivas Alerts', + subject_template TEXT, + body_template TEXT, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + );" + ).await.unwrap(); + pool + } + + async fn create_user(pool: &SqlitePool) -> i64 { + let repo = UserRepository { db: pool }; + let user = repo.create_user(None, Some(UserRole::User)).await.unwrap(); + user.id + } + + #[tokio::test] + async fn test_create_and_get_ntfy_settings() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = NtfySettingsRepository { db: &db }; + let settings = repo.create(user_id, true, "topic1".to_string(), "https://ntfy.sh".to_string(), 3, Some("title".to_string()), Some("msg".to_string())).await.unwrap(); + let fetched = repo.get_by_user(user_id).await.unwrap().unwrap(); + assert_eq!(fetched.topic, "topic1"); + assert_eq!(fetched.server_url, "https://ntfy.sh"); + assert_eq!(fetched.priority, 3); + assert_eq!(fetched.title_template, Some("title".to_string())); + assert_eq!(fetched.message_template, Some("msg".to_string())); + } + + #[tokio::test] + async fn test_update_ntfy_settings() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = NtfySettingsRepository { db: &db }; + let settings = repo.create(user_id, true, "topic1".to_string(), "https://ntfy.sh".to_string(), 3, None, None).await.unwrap(); + let updated = repo.update(settings.id, false, "topic2".to_string(), "https://ntfy2.sh".to_string(), 4, Some("t2".to_string()), Some("m2".to_string())).await.unwrap(); + assert_eq!(updated.enabled, false); + assert_eq!(updated.topic, "topic2"); + assert_eq!(updated.server_url, "https://ntfy2.sh"); + assert_eq!(updated.priority, 4); + assert_eq!(updated.title_template, Some("t2".to_string())); + assert_eq!(updated.message_template, Some("m2".to_string())); + } + + #[tokio::test] + async fn test_delete_ntfy_settings() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = NtfySettingsRepository { db: &db }; + let settings = repo.create(user_id, true, "topic1".to_string(), "https://ntfy.sh".to_string(), 3, None, None).await.unwrap(); + repo.delete(settings.id).await.unwrap(); + let fetched = repo.get_by_user(user_id).await.unwrap(); + assert!(fetched.is_none()); + } + + #[tokio::test] + async fn test_create_and_get_smtp_settings() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = SmtpSettingsRepository { db: &db }; + let settings = repo.create(user_id, true, "test@example.com".to_string(), "smtp.example.com".to_string(), 587, Some("user".to_string()), Some("pass".to_string()), true, Some("from@example.com".to_string()), Some("Alerts".to_string()), Some("subj".to_string()), Some("body".to_string())).await.unwrap(); + let fetched = repo.get_by_user(user_id).await.unwrap().unwrap(); + assert_eq!(fetched.email, "test@example.com"); + assert_eq!(fetched.smtp_server, "smtp.example.com"); + assert_eq!(fetched.smtp_port, 587); + assert_eq!(fetched.username, Some("user".to_string())); + assert_eq!(fetched.password, Some("pass".to_string())); + assert_eq!(fetched.use_tls, true); + assert_eq!(fetched.from_email, Some("from@example.com".to_string())); + assert_eq!(fetched.from_name, Some("Alerts".to_string())); + assert_eq!(fetched.subject_template, Some("subj".to_string())); + assert_eq!(fetched.body_template, Some("body".to_string())); + } + + #[tokio::test] + async fn test_update_smtp_settings() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = SmtpSettingsRepository { db: &db }; + let settings = repo.create(user_id, true, "test@example.com".to_string(), "smtp.example.com".to_string(), 587, None, None, true, None, None, None, None).await.unwrap(); + let updated = repo.update(settings.id, false, "other@example.com".to_string(), "smtp2.example.com".to_string(), 465, Some("u2".to_string()), Some("p2".to_string()), false, Some("f2@example.com".to_string()), Some("N2".to_string()), Some("s2".to_string()), Some("b2".to_string())).await.unwrap(); + assert_eq!(updated.enabled, false); + assert_eq!(updated.email, "other@example.com"); + assert_eq!(updated.smtp_server, "smtp2.example.com"); + assert_eq!(updated.smtp_port, 465); + assert_eq!(updated.username, Some("u2".to_string())); + assert_eq!(updated.password, Some("p2".to_string())); + assert_eq!(updated.use_tls, false); + assert_eq!(updated.from_email, Some("f2@example.com".to_string())); + assert_eq!(updated.from_name, Some("N2".to_string())); + assert_eq!(updated.subject_template, Some("s2".to_string())); + assert_eq!(updated.body_template, Some("b2".to_string())); + } + + #[tokio::test] + async fn test_delete_smtp_settings() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = SmtpSettingsRepository { db: &db }; + let settings = repo.create(user_id, true, "test@example.com".to_string(), "smtp.example.com".to_string(), 587, None, None, true, None, None, None, None).await.unwrap(); + repo.delete(settings.id).await.unwrap(); + let fetched = repo.get_by_user(user_id).await.unwrap(); + assert!(fetched.is_none()); + } +} \ No newline at end of file diff --git a/src/users.rs b/src/users.rs new file mode 100644 index 0000000..baca6dd --- /dev/null +++ b/src/users.rs @@ -0,0 +1,139 @@ +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq, Eq)] +pub struct User { + pub id: i64, + pub user_id: String, // API token + pub role: UserRole, +} + +#[derive(Debug, Serialize, Deserialize, sqlx::Type, Clone, PartialEq, Eq)] +#[sqlx(type_name = "TEXT")] +pub enum UserRole { + #[serde(rename = "user")] + User, + #[serde(rename = "admin")] + Admin, +} + +impl Default for UserRole { + fn default() -> Self { + UserRole::User + } +} + +pub struct UserRepository<'a> { + pub db: &'a sqlx::SqlitePool, +} + +impl<'a> UserRepository<'a> { + pub async fn list_users(&self) -> Result, sqlx::Error> { + sqlx::query_as::<_, User>("SELECT id, user_id, role FROM users") + .fetch_all(self.db) + .await + } + + pub async fn get_user_by_id(&self, id: i64) -> Result, sqlx::Error> { + sqlx::query_as::<_, User>("SELECT id, user_id, role FROM users WHERE id = ?") + .bind(id) + .fetch_optional(self.db) + .await + } + + pub async fn get_user_by_user_id(&self, user_id: &str) -> Result, sqlx::Error> { + sqlx::query_as::<_, User>("SELECT id, user_id, role FROM users WHERE user_id = ?") + .bind(user_id) + .fetch_optional(self.db) + .await + } + + pub async fn create_user(&self, user_id: Option, role: Option) -> Result { + let user_id = user_id.unwrap_or_else(|| Uuid::new_v4().to_string()); + let role = role.unwrap_or_default(); + sqlx::query_as::<_, User>( + "INSERT INTO users (user_id, role) VALUES (?, ?) RETURNING id, user_id, role" + ) + .bind(user_id) + .bind(role) + .fetch_one(self.db) + .await + } + + pub async fn update_user(&self, id: i64, role: UserRole) -> Result { + sqlx::query_as::<_, User>( + "UPDATE users SET role = ? WHERE id = ? RETURNING id, user_id, role" + ) + .bind(role) + .bind(id) + .fetch_one(self.db) + .await + } + + pub async fn delete_user(&self, id: i64) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM users WHERE id = ?") + .bind(id) + .execute(self.db) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sqlx::{SqlitePool, Executor}; + use tokio; + + async fn setup_db() -> SqlitePool { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + pool.execute( + "CREATE TABLE users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL UNIQUE, + role TEXT NOT NULL DEFAULT 'user' + );" + ).await.unwrap(); + pool + } + + #[tokio::test] + async fn test_create_and_get_user() { + let db = setup_db().await; + let repo = UserRepository { db: &db }; + let user = repo.create_user(None, Some(UserRole::Admin)).await.unwrap(); + assert_eq!(user.role, UserRole::Admin); + let fetched = repo.get_user_by_user_id(&user.user_id).await.unwrap().unwrap(); + assert_eq!(fetched.user_id, user.user_id); + } + + #[tokio::test] + async fn test_update_user() { + let db = setup_db().await; + let repo = UserRepository { db: &db }; + let user = repo.create_user(None, Some(UserRole::User)).await.unwrap(); + let updated = repo.update_user(user.id, UserRole::Admin).await.unwrap(); + assert_eq!(updated.role, UserRole::Admin); + } + + #[tokio::test] + async fn test_delete_user() { + let db = setup_db().await; + let repo = UserRepository { db: &db }; + let user = repo.create_user(None, None).await.unwrap(); + repo.delete_user(user.id).await.unwrap(); + let fetched = repo.get_user_by_id(user.id).await.unwrap(); + assert!(fetched.is_none()); + } + + #[tokio::test] + async fn test_list_users() { + let db = setup_db().await; + let repo = UserRepository { db: &db }; + repo.create_user(None, Some(UserRole::User)).await.unwrap(); + repo.create_user(None, Some(UserRole::Admin)).await.unwrap(); + let users = repo.list_users().await.unwrap(); + assert_eq!(users.len(), 2); + } +} \ No newline at end of file diff --git a/src/weather_poller.rs b/src/weather_poller.rs new file mode 100644 index 0000000..056cef8 --- /dev/null +++ b/src/weather_poller.rs @@ -0,0 +1,192 @@ +use crate::users::UserRepository; +use crate::locations::LocationRepository; +use crate::weather_thresholds::{WeatherThresholdRepository, WeatherThreshold}; +use crate::notifications::{NtfySettingsRepository, SmtpSettingsRepository, NtfySettings, SmtpSettings}; +use serde_json::Value; +use tera::{Tera, Context}; +use reqwest::Client; +use std::sync::{Arc, Mutex}; +use tokio_task_scheduler::{Scheduler, Task}; +use tokio::time::Duration; + +const OWM_API_URL: &str = "https://api.openweathermap.org/data/2.5/forecast"; +const ALERT_WINDOW_HOURS: i64 = 24; + +pub struct WeatherPoller { + pub db: Arc, + pub owm_api_key: String, + pub tera: Arc>, +} + +impl WeatherPoller { + pub async fn check_all(&self) { + let user_repo = UserRepository { db: &self.db }; + let loc_repo = LocationRepository { db: &self.db }; + let users = user_repo.list_users().await.unwrap_or_default(); + for user in users { + if let Some(location) = loc_repo.list_locations().await.unwrap_or_default().into_iter().find(|l| l.user_id == user.id) { + self.check_user_weather(user.id, location.latitude, location.longitude).await; + } + } + } + + pub async fn check_user_weather(&self, user_id: i64, lat: f64, lon: f64) { + if let Ok(Some(forecast)) = self.fetch_forecast(lat, lon).await { + let threshold_repo = WeatherThresholdRepository { db: &self.db }; + let thresholds = threshold_repo.list_thresholds(user_id).await.unwrap_or_default().into_iter().filter(|t| t.enabled).collect::>(); + if let Some(entry) = find_first_alert_entry(&forecast, &thresholds) { + self.send_notifications(user_id, &entry).await; + } + } + } + + pub async fn fetch_forecast(&self, lat: f64, lon: f64) -> Result>, reqwest::Error> { + let client = Client::new(); + let resp = client.get(OWM_API_URL) + .query(&[ + ("lat", lat.to_string()), + ("lon", lon.to_string()), + ("units", "metric".to_string()), + ("appid", self.owm_api_key.clone()), + ]) + .send() + .await?; + let json: Value = resp.json().await?; + Ok(json["list"].as_array().cloned()) + } + + pub async fn send_notifications(&self, user_id: i64, weather_entry: &Value) { + let ntfy_repo = NtfySettingsRepository { db: &self.db }; + let smtp_repo = SmtpSettingsRepository { db: &self.db }; + let tera = self.tera.clone(); + if let Some(ntfy) = ntfy_repo.get_by_user(user_id).await.unwrap_or(None) { + if ntfy.enabled { + send_ntfy_notification(&ntfy, weather_entry, tera.clone()).await; + } + } + if let Some(smtp) = smtp_repo.get_by_user(user_id).await.unwrap_or(None) { + if smtp.enabled { + send_smtp_notification(&smtp, weather_entry, tera.clone()).await; + } + } + } +} + +fn find_first_alert_entry(forecast: &[Value], thresholds: &[WeatherThreshold]) -> Option { + use chrono::{Utc, TimeZone}; + let now = Utc::now(); + for entry in forecast { + if let Some(ts) = entry["dt"].as_i64() { + let forecast_time = Utc.timestamp_opt(ts, 0).single()?; + if (forecast_time - now).num_hours() > ALERT_WINDOW_HOURS { + break; + } + if thresholds.iter().any(|t| threshold_triggered(t, entry)) { + return Some(entry.clone()); + } + } + } + None +} + +fn threshold_triggered(threshold: &WeatherThreshold, entry: &Value) -> bool { + let value = match threshold.condition_type.as_str() { + "wind_speed" => entry.pointer("/wind/speed").and_then(|v| v.as_f64()).unwrap_or(0.0) * 3.6, + "rain" => entry.pointer("/rain/3h").and_then(|v| v.as_f64()).unwrap_or(0.0), + "temp_min" | "temp_max" => entry.pointer("/main/temp").and_then(|v| v.as_f64()).unwrap_or(0.0), + _ => return false, + }; + compare(value, &threshold.operator, threshold.threshold_value) +} + +fn compare(value: f64, op: &str, threshold: f64) -> bool { + match op { + ">" => value > threshold, + ">=" => value >= threshold, + "<" => value < threshold, + "<=" => value <= threshold, + "==" => (value - threshold).abs() < std::f64::EPSILON, + _ => false, + } +} + +async fn send_ntfy_notification(ntfy: &NtfySettings, weather_entry: &Value, tera: Arc>) { + let mut ctx = Context::new(); + add_weather_context(&mut ctx, weather_entry); + let title = if let Some(tpl) = &ntfy.title_template { + let mut tera = tera.lock().unwrap(); + tera.render_str(tpl, &ctx).unwrap_or_else(|_| "🚨 Weather Alert".to_string()) + } else { + "🚨 Weather Alert".to_string() + }; + let message = if let Some(tpl) = &ntfy.message_template { + let mut tera = tera.lock().unwrap(); + tera.render_str(tpl, &ctx).unwrap_or_else(|_| "🚨 Weather alert for your location".to_string()) + } else { + default_weather_message(weather_entry) + }; + let client = Client::new(); + let _ = client.post(&format!("{}/{}", ntfy.server_url, ntfy.topic)) + .header("Priority", ntfy.priority.to_string()) + .header("Title", title) + .body(message) + .send() + .await; +} + +async fn send_smtp_notification(smtp: &SmtpSettings, weather_entry: &Value, tera: Arc>) { + use lettre::{Message, SmtpTransport, Transport, transport::smtp::authentication::Credentials}; + let mut ctx = Context::new(); + add_weather_context(&mut ctx, weather_entry); + let subject = if let Some(tpl) = &smtp.subject_template { + let mut tera = tera.lock().unwrap(); + tera.render_str(tpl, &ctx).unwrap_or_else(|_| "⚠️ Weather Alert for Your Location".to_string()) + } else { + "⚠️ Weather Alert for Your Location".to_string() + }; + let body = if let Some(tpl) = &smtp.body_template { + let mut tera = tera.lock().unwrap(); + tera.render_str(tpl, &ctx).unwrap_or_else(|_| default_weather_message(weather_entry)) + } else { + default_weather_message(weather_entry) + }; + let from = smtp.from_email.clone().unwrap_or_else(|| smtp.email.clone()); + let from_name = smtp.from_name.clone().unwrap_or_else(|| "Silmätaivas Alerts".to_string()); + let email = Message::builder() + .from(format!("{} <{}>", from_name, from).parse().unwrap()) + .to(smtp.email.parse().unwrap()) + .subject(subject) + .body(body) + .unwrap(); + let creds = smtp.username.as_ref().and_then(|u| smtp.password.as_ref().map(|p| Credentials::new(u.clone(), p.clone()))); + let mailer = if let Some(creds) = creds { + SmtpTransport::relay(&smtp.smtp_server).unwrap() + .port(smtp.smtp_port as u16) + .credentials(creds) + .build() + } else { + SmtpTransport::relay(&smtp.smtp_server).unwrap() + .port(smtp.smtp_port as u16) + .build() + }; + let _ = mailer.send(&email); +} + +fn add_weather_context(ctx: &mut Context, entry: &Value) { + ctx.insert("temp", &entry.pointer("/main/temp").and_then(|v| v.as_f64()).unwrap_or(0.0)); + ctx.insert("wind_speed", &(entry.pointer("/wind/speed").and_then(|v| v.as_f64()).unwrap_or(0.0) * 3.6)); + ctx.insert("rain", &entry.pointer("/rain/3h").and_then(|v| v.as_f64()).unwrap_or(0.0)); + ctx.insert("time", &entry["dt_txt"].as_str().unwrap_or("N/A")); + ctx.insert("humidity", &entry.pointer("/main/humidity").and_then(|v| v.as_f64()).unwrap_or(0.0)); + ctx.insert("pressure", &entry.pointer("/main/pressure").and_then(|v| v.as_f64()).unwrap_or(0.0)); +} + +fn default_weather_message(entry: &Value) -> String { + let temp = entry.pointer("/main/temp").and_then(|v| v.as_f64()).unwrap_or(0.0); + let wind = entry.pointer("/wind/speed").and_then(|v| v.as_f64()).unwrap_or(0.0) * 3.6; + let rain = entry.pointer("/rain/3h").and_then(|v| v.as_f64()).unwrap_or(0.0); + let time = entry["dt_txt"].as_str().unwrap_or("N/A"); + format!("🚨 Weather alert for your location ({}):\n\n🌬️ Wind: {:.1} km/h\n🌧️ Rain: {:.1} mm\n🌡️ Temperature: {:.1} °C\n\nStay safe,\n— Silmätaivas", time, wind, rain, temp) +} + +// Unit tests for threshold logic and template rendering can be added here. \ No newline at end of file diff --git a/src/weather_thresholds.rs b/src/weather_thresholds.rs new file mode 100644 index 0000000..bfb9cdf --- /dev/null +++ b/src/weather_thresholds.rs @@ -0,0 +1,165 @@ +use serde::{Deserialize, Serialize}; +use sqlx::FromRow; + +#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq)] +pub struct WeatherThreshold { + pub id: i64, + pub user_id: i64, + pub condition_type: String, + pub threshold_value: f64, + pub operator: String, + pub enabled: bool, + pub description: Option, +} + +pub struct WeatherThresholdRepository<'a> { + pub db: &'a sqlx::SqlitePool, +} + +impl<'a> WeatherThresholdRepository<'a> { + pub async fn list_thresholds(&self, user_id: i64) -> Result, sqlx::Error> { + sqlx::query_as::<_, WeatherThreshold>( + "SELECT id, user_id, condition_type, threshold_value, operator, enabled, description FROM weather_thresholds WHERE user_id = ?" + ) + .bind(user_id) + .fetch_all(self.db) + .await + } + + pub async fn get_threshold(&self, id: i64, user_id: i64) -> Result, sqlx::Error> { + sqlx::query_as::<_, WeatherThreshold>( + "SELECT id, user_id, condition_type, threshold_value, operator, enabled, description FROM weather_thresholds WHERE id = ? AND user_id = ?" + ) + .bind(id) + .bind(user_id) + .fetch_optional(self.db) + .await + } + + pub async fn create_threshold(&self, user_id: i64, condition_type: String, threshold_value: f64, operator: String, enabled: bool, description: Option) -> Result { + sqlx::query_as::<_, WeatherThreshold>( + "INSERT INTO weather_thresholds (user_id, condition_type, threshold_value, operator, enabled, description) VALUES (?, ?, ?, ?, ?, ?) RETURNING id, user_id, condition_type, threshold_value, operator, enabled, description" + ) + .bind(user_id) + .bind(condition_type) + .bind(threshold_value) + .bind(operator) + .bind(enabled) + .bind(description) + .fetch_one(self.db) + .await + } + + pub async fn update_threshold(&self, id: i64, user_id: i64, condition_type: String, threshold_value: f64, operator: String, enabled: bool, description: Option) -> Result { + sqlx::query_as::<_, WeatherThreshold>( + "UPDATE weather_thresholds SET condition_type = ?, threshold_value = ?, operator = ?, enabled = ?, description = ? WHERE id = ? AND user_id = ? RETURNING id, user_id, condition_type, threshold_value, operator, enabled, description" + ) + .bind(condition_type) + .bind(threshold_value) + .bind(operator) + .bind(enabled) + .bind(description) + .bind(id) + .bind(user_id) + .fetch_one(self.db) + .await + } + + pub async fn delete_threshold(&self, id: i64, user_id: i64) -> Result<(), sqlx::Error> { + sqlx::query("DELETE FROM weather_thresholds WHERE id = ? AND user_id = ?") + .bind(id) + .bind(user_id) + .execute(self.db) + .await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::users::{UserRepository, UserRole}; + use sqlx::{SqlitePool, Executor}; + use tokio; + + async fn setup_db() -> SqlitePool { + let pool = SqlitePool::connect(":memory:").await.unwrap(); + pool.execute( + "CREATE TABLE users ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id TEXT NOT NULL UNIQUE, + role TEXT NOT NULL DEFAULT 'user' + );" + ).await.unwrap(); + pool.execute( + "CREATE TABLE weather_thresholds ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_id INTEGER NOT NULL, + condition_type TEXT NOT NULL, + threshold_value REAL NOT NULL, + operator TEXT NOT NULL, + enabled BOOLEAN NOT NULL DEFAULT 1, + description TEXT, + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE + );" + ).await.unwrap(); + pool + } + + async fn create_user(pool: &SqlitePool) -> i64 { + let repo = UserRepository { db: pool }; + let user = repo.create_user(None, Some(UserRole::User)).await.unwrap(); + user.id + } + + #[tokio::test] + async fn test_create_and_get_threshold() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = WeatherThresholdRepository { db: &db }; + let th = repo.create_threshold(user_id, "wind_speed".to_string(), 10.0, ">".to_string(), true, Some("desc".to_string())).await.unwrap(); + let fetched = repo.get_threshold(th.id, user_id).await.unwrap().unwrap(); + assert_eq!(fetched.condition_type, "wind_speed"); + assert_eq!(fetched.threshold_value, 10.0); + assert_eq!(fetched.operator, ">" + ); + assert_eq!(fetched.enabled, true); + assert_eq!(fetched.description, Some("desc".to_string())); + } + + #[tokio::test] + async fn test_update_threshold() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = WeatherThresholdRepository { db: &db }; + let th = repo.create_threshold(user_id, "wind_speed".to_string(), 10.0, ">".to_string(), true, None).await.unwrap(); + let updated = repo.update_threshold(th.id, user_id, "rain".to_string(), 5.0, "<".to_string(), false, Some("rain desc".to_string())).await.unwrap(); + assert_eq!(updated.condition_type, "rain"); + assert_eq!(updated.threshold_value, 5.0); + assert_eq!(updated.operator, "<"); + assert_eq!(updated.enabled, false); + assert_eq!(updated.description, Some("rain desc".to_string())); + } + + #[tokio::test] + async fn test_delete_threshold() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = WeatherThresholdRepository { db: &db }; + let th = repo.create_threshold(user_id, "wind_speed".to_string(), 10.0, ">".to_string(), true, None).await.unwrap(); + repo.delete_threshold(th.id, user_id).await.unwrap(); + let fetched = repo.get_threshold(th.id, user_id).await.unwrap(); + assert!(fetched.is_none()); + } + + #[tokio::test] + async fn test_list_thresholds() { + let db = setup_db().await; + let user_id = create_user(&db).await; + let repo = WeatherThresholdRepository { db: &db }; + repo.create_threshold(user_id, "wind_speed".to_string(), 10.0, ">".to_string(), true, None).await.unwrap(); + repo.create_threshold(user_id, "rain".to_string(), 5.0, "<".to_string(), false, None).await.unwrap(); + let thresholds = repo.list_thresholds(user_id).await.unwrap(); + assert_eq!(thresholds.len(), 2); + } +} \ No newline at end of file -- cgit v1.2.3