summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/health.rs6
-rw-r--r--src/lib.rs31
-rw-r--r--src/locations.rs139
-rw-r--r--src/notifications.rs294
-rw-r--r--src/users.rs139
-rw-r--r--src/weather_poller.rs192
-rw-r--r--src/weather_thresholds.rs165
7 files changed, 966 insertions, 0 deletions
diff --git a/src/health.rs b/src/health.rs
new file mode 100644
index 0000000..daa2f43
--- /dev/null
+++ b/src/health.rs
@@ -0,0 +1,6 @@
+use axum::{response::IntoResponse, Json};
+use serde_json::json;
+
+pub async fn health_handler() -> impl IntoResponse {
+ Json(json!({"status": "ok"}))
+} \ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644
index 0000000..f51483b
--- /dev/null
+++ b/src/lib.rs
@@ -0,0 +1,31 @@
+use axum::{Router, routing::get};
+
+pub fn app() -> Router {
+ Router::new()
+ .route("/health", get(crate::health::health_handler))
+}
+
+pub mod users;
+pub mod locations;
+pub mod weather_thresholds;
+pub mod notifications;
+pub mod weather_poller;
+pub mod health;
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use axum::body::Body;
+ use axum::http::{Request, StatusCode};
+ use tower::ServiceExt; // for `oneshot`
+ use axum::body::to_bytes;
+
+ #[tokio::test]
+ async fn test_health_endpoint() {
+ let app = app();
+ let response = app.oneshot(Request::builder().uri("/health").body(Body::empty()).unwrap()).await.unwrap();
+ assert_eq!(response.status(), StatusCode::OK);
+ let body = to_bytes(response.into_body(), 1024).await.unwrap();
+ assert_eq!(&body[..], b"{\"status\":\"ok\"}");
+ }
+}
diff --git a/src/locations.rs b/src/locations.rs
new file mode 100644
index 0000000..e5f881d
--- /dev/null
+++ b/src/locations.rs
@@ -0,0 +1,139 @@
+use serde::{Deserialize, Serialize};
+use sqlx::FromRow;
+
+#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq)]
+pub struct Location {
+ pub id: i64,
+ pub latitude: f64,
+ pub longitude: f64,
+ pub user_id: i64,
+}
+
+pub struct LocationRepository<'a> {
+ pub db: &'a sqlx::SqlitePool,
+}
+
+impl<'a> LocationRepository<'a> {
+ pub async fn list_locations(&self) -> Result<Vec<Location>, sqlx::Error> {
+ sqlx::query_as::<_, Location>("SELECT id, latitude, longitude, user_id FROM locations")
+ .fetch_all(self.db)
+ .await
+ }
+
+ pub async fn get_location(&self, id: i64) -> Result<Option<Location>, sqlx::Error> {
+ sqlx::query_as::<_, Location>("SELECT id, latitude, longitude, user_id FROM locations WHERE id = ?")
+ .bind(id)
+ .fetch_optional(self.db)
+ .await
+ }
+
+ pub async fn create_location(&self, latitude: f64, longitude: f64, user_id: i64) -> Result<Location, sqlx::Error> {
+ sqlx::query_as::<_, Location>(
+ "INSERT INTO locations (latitude, longitude, user_id) VALUES (?, ?, ?) RETURNING id, latitude, longitude, user_id"
+ )
+ .bind(latitude)
+ .bind(longitude)
+ .bind(user_id)
+ .fetch_one(self.db)
+ .await
+ }
+
+ pub async fn update_location(&self, id: i64, latitude: f64, longitude: f64) -> Result<Location, sqlx::Error> {
+ sqlx::query_as::<_, Location>(
+ "UPDATE locations SET latitude = ?, longitude = ? WHERE id = ? RETURNING id, latitude, longitude, user_id"
+ )
+ .bind(latitude)
+ .bind(longitude)
+ .bind(id)
+ .fetch_one(self.db)
+ .await
+ }
+
+ pub async fn delete_location(&self, id: i64) -> Result<(), sqlx::Error> {
+ sqlx::query("DELETE FROM locations WHERE id = ?")
+ .bind(id)
+ .execute(self.db)
+ .await?;
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::users::{UserRepository, UserRole};
+ use sqlx::{SqlitePool, Executor};
+ use tokio;
+
+ async fn setup_db() -> SqlitePool {
+ let pool = SqlitePool::connect(":memory:").await.unwrap();
+ pool.execute(
+ "CREATE TABLE users (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id TEXT NOT NULL UNIQUE,
+ role TEXT NOT NULL DEFAULT 'user'
+ );"
+ ).await.unwrap();
+ pool.execute(
+ "CREATE TABLE locations (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ latitude REAL NOT NULL,
+ longitude REAL NOT NULL,
+ user_id INTEGER NOT NULL,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE NO ACTION
+ );"
+ ).await.unwrap();
+ pool
+ }
+
+ async fn create_user(pool: &SqlitePool) -> i64 {
+ let repo = UserRepository { db: pool };
+ let user = repo.create_user(None, Some(UserRole::User)).await.unwrap();
+ user.id
+ }
+
+ #[tokio::test]
+ async fn test_create_and_get_location() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = LocationRepository { db: &db };
+ let loc = repo.create_location(60.0, 24.0, user_id).await.unwrap();
+ let fetched = repo.get_location(loc.id).await.unwrap().unwrap();
+ assert_eq!(fetched.latitude, 60.0);
+ assert_eq!(fetched.longitude, 24.0);
+ assert_eq!(fetched.user_id, user_id);
+ }
+
+ #[tokio::test]
+ async fn test_update_location() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = LocationRepository { db: &db };
+ let loc = repo.create_location(60.0, 24.0, user_id).await.unwrap();
+ let updated = repo.update_location(loc.id, 61.0, 25.0).await.unwrap();
+ assert_eq!(updated.latitude, 61.0);
+ assert_eq!(updated.longitude, 25.0);
+ }
+
+ #[tokio::test]
+ async fn test_delete_location() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = LocationRepository { db: &db };
+ let loc = repo.create_location(60.0, 24.0, user_id).await.unwrap();
+ repo.delete_location(loc.id).await.unwrap();
+ let fetched = repo.get_location(loc.id).await.unwrap();
+ assert!(fetched.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_list_locations() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = LocationRepository { db: &db };
+ repo.create_location(60.0, 24.0, user_id).await.unwrap();
+ repo.create_location(61.0, 25.0, user_id).await.unwrap();
+ let locations = repo.list_locations().await.unwrap();
+ assert_eq!(locations.len(), 2);
+ }
+} \ No newline at end of file
diff --git a/src/notifications.rs b/src/notifications.rs
new file mode 100644
index 0000000..fb49e97
--- /dev/null
+++ b/src/notifications.rs
@@ -0,0 +1,294 @@
+use serde::{Deserialize, Serialize};
+use sqlx::FromRow;
+
+#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq)]
+pub struct NtfySettings {
+ pub id: i64,
+ pub user_id: i64,
+ pub enabled: bool,
+ pub topic: String,
+ pub server_url: String,
+ pub priority: i32,
+ pub title_template: Option<String>,
+ pub message_template: Option<String>,
+}
+
+#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq)]
+pub struct SmtpSettings {
+ pub id: i64,
+ pub user_id: i64,
+ pub enabled: bool,
+ pub email: String,
+ pub smtp_server: String,
+ pub smtp_port: i32,
+ pub username: Option<String>,
+ pub password: Option<String>,
+ pub use_tls: bool,
+ pub from_email: Option<String>,
+ pub from_name: Option<String>,
+ pub subject_template: Option<String>,
+ pub body_template: Option<String>,
+}
+
+pub struct NtfySettingsRepository<'a> {
+ pub db: &'a sqlx::SqlitePool,
+}
+
+impl<'a> NtfySettingsRepository<'a> {
+ pub async fn get_by_user(&self, user_id: i64) -> Result<Option<NtfySettings>, sqlx::Error> {
+ sqlx::query_as::<_, NtfySettings>(
+ "SELECT * FROM user_ntfy_settings WHERE user_id = ?"
+ )
+ .bind(user_id)
+ .fetch_optional(self.db)
+ .await
+ }
+
+ pub async fn create(&self, user_id: i64, enabled: bool, topic: String, server_url: String, priority: i32, title_template: Option<String>, message_template: Option<String>) -> Result<NtfySettings, sqlx::Error> {
+ sqlx::query_as::<_, NtfySettings>(
+ "INSERT INTO user_ntfy_settings (user_id, enabled, topic, server_url, priority, title_template, message_template) VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING *"
+ )
+ .bind(user_id)
+ .bind(enabled)
+ .bind(topic)
+ .bind(server_url)
+ .bind(priority)
+ .bind(title_template)
+ .bind(message_template)
+ .fetch_one(self.db)
+ .await
+ }
+
+ pub async fn update(&self, id: i64, enabled: bool, topic: String, server_url: String, priority: i32, title_template: Option<String>, message_template: Option<String>) -> Result<NtfySettings, sqlx::Error> {
+ sqlx::query_as::<_, NtfySettings>(
+ "UPDATE user_ntfy_settings SET enabled = ?, topic = ?, server_url = ?, priority = ?, title_template = ?, message_template = ? WHERE id = ? RETURNING *"
+ )
+ .bind(enabled)
+ .bind(topic)
+ .bind(server_url)
+ .bind(priority)
+ .bind(title_template)
+ .bind(message_template)
+ .bind(id)
+ .fetch_one(self.db)
+ .await
+ }
+
+ pub async fn delete(&self, id: i64) -> Result<(), sqlx::Error> {
+ sqlx::query("DELETE FROM user_ntfy_settings WHERE id = ?")
+ .bind(id)
+ .execute(self.db)
+ .await?;
+ Ok(())
+ }
+}
+
+pub struct SmtpSettingsRepository<'a> {
+ pub db: &'a sqlx::SqlitePool,
+}
+
+impl<'a> SmtpSettingsRepository<'a> {
+ pub async fn get_by_user(&self, user_id: i64) -> Result<Option<SmtpSettings>, sqlx::Error> {
+ sqlx::query_as::<_, SmtpSettings>(
+ "SELECT * FROM user_smtp_settings WHERE user_id = ?"
+ )
+ .bind(user_id)
+ .fetch_optional(self.db)
+ .await
+ }
+
+ pub async fn create(&self, user_id: i64, enabled: bool, email: String, smtp_server: String, smtp_port: i32, username: Option<String>, password: Option<String>, use_tls: bool, from_email: Option<String>, from_name: Option<String>, subject_template: Option<String>, body_template: Option<String>) -> Result<SmtpSettings, sqlx::Error> {
+ sqlx::query_as::<_, SmtpSettings>(
+ "INSERT INTO user_smtp_settings (user_id, enabled, email, smtp_server, smtp_port, username, password, use_tls, from_email, from_name, subject_template, body_template) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING *"
+ )
+ .bind(user_id)
+ .bind(enabled)
+ .bind(email)
+ .bind(smtp_server)
+ .bind(smtp_port)
+ .bind(username)
+ .bind(password)
+ .bind(use_tls)
+ .bind(from_email)
+ .bind(from_name)
+ .bind(subject_template)
+ .bind(body_template)
+ .fetch_one(self.db)
+ .await
+ }
+
+ pub async fn update(&self, id: i64, enabled: bool, email: String, smtp_server: String, smtp_port: i32, username: Option<String>, password: Option<String>, use_tls: bool, from_email: Option<String>, from_name: Option<String>, subject_template: Option<String>, body_template: Option<String>) -> Result<SmtpSettings, sqlx::Error> {
+ sqlx::query_as::<_, SmtpSettings>(
+ "UPDATE user_smtp_settings SET enabled = ?, email = ?, smtp_server = ?, smtp_port = ?, username = ?, password = ?, use_tls = ?, from_email = ?, from_name = ?, subject_template = ?, body_template = ? WHERE id = ? RETURNING *"
+ )
+ .bind(enabled)
+ .bind(email)
+ .bind(smtp_server)
+ .bind(smtp_port)
+ .bind(username)
+ .bind(password)
+ .bind(use_tls)
+ .bind(from_email)
+ .bind(from_name)
+ .bind(subject_template)
+ .bind(body_template)
+ .bind(id)
+ .fetch_one(self.db)
+ .await
+ }
+
+ pub async fn delete(&self, id: i64) -> Result<(), sqlx::Error> {
+ sqlx::query("DELETE FROM user_smtp_settings WHERE id = ?")
+ .bind(id)
+ .execute(self.db)
+ .await?;
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::users::{UserRepository, UserRole};
+ use sqlx::{SqlitePool, Executor};
+ use tokio;
+
+ async fn setup_db() -> SqlitePool {
+ let pool = SqlitePool::connect(":memory:").await.unwrap();
+ pool.execute(
+ "CREATE TABLE users (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id TEXT NOT NULL UNIQUE,
+ role TEXT NOT NULL DEFAULT 'user'
+ );"
+ ).await.unwrap();
+ pool.execute(
+ "CREATE TABLE user_ntfy_settings (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ enabled BOOLEAN NOT NULL DEFAULT 0,
+ topic TEXT NOT NULL,
+ server_url TEXT NOT NULL,
+ priority INTEGER NOT NULL DEFAULT 5,
+ title_template TEXT,
+ message_template TEXT,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
+ );"
+ ).await.unwrap();
+ pool.execute(
+ "CREATE TABLE user_smtp_settings (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ enabled BOOLEAN NOT NULL DEFAULT 0,
+ email TEXT NOT NULL,
+ smtp_server TEXT NOT NULL,
+ smtp_port INTEGER NOT NULL,
+ username TEXT,
+ password TEXT,
+ use_tls BOOLEAN NOT NULL DEFAULT 1,
+ from_email TEXT,
+ from_name TEXT DEFAULT 'Silmätaivas Alerts',
+ subject_template TEXT,
+ body_template TEXT,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
+ );"
+ ).await.unwrap();
+ pool
+ }
+
+ async fn create_user(pool: &SqlitePool) -> i64 {
+ let repo = UserRepository { db: pool };
+ let user = repo.create_user(None, Some(UserRole::User)).await.unwrap();
+ user.id
+ }
+
+ #[tokio::test]
+ async fn test_create_and_get_ntfy_settings() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = NtfySettingsRepository { db: &db };
+ let settings = repo.create(user_id, true, "topic1".to_string(), "https://ntfy.sh".to_string(), 3, Some("title".to_string()), Some("msg".to_string())).await.unwrap();
+ let fetched = repo.get_by_user(user_id).await.unwrap().unwrap();
+ assert_eq!(fetched.topic, "topic1");
+ assert_eq!(fetched.server_url, "https://ntfy.sh");
+ assert_eq!(fetched.priority, 3);
+ assert_eq!(fetched.title_template, Some("title".to_string()));
+ assert_eq!(fetched.message_template, Some("msg".to_string()));
+ }
+
+ #[tokio::test]
+ async fn test_update_ntfy_settings() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = NtfySettingsRepository { db: &db };
+ let settings = repo.create(user_id, true, "topic1".to_string(), "https://ntfy.sh".to_string(), 3, None, None).await.unwrap();
+ let updated = repo.update(settings.id, false, "topic2".to_string(), "https://ntfy2.sh".to_string(), 4, Some("t2".to_string()), Some("m2".to_string())).await.unwrap();
+ assert_eq!(updated.enabled, false);
+ assert_eq!(updated.topic, "topic2");
+ assert_eq!(updated.server_url, "https://ntfy2.sh");
+ assert_eq!(updated.priority, 4);
+ assert_eq!(updated.title_template, Some("t2".to_string()));
+ assert_eq!(updated.message_template, Some("m2".to_string()));
+ }
+
+ #[tokio::test]
+ async fn test_delete_ntfy_settings() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = NtfySettingsRepository { db: &db };
+ let settings = repo.create(user_id, true, "topic1".to_string(), "https://ntfy.sh".to_string(), 3, None, None).await.unwrap();
+ repo.delete(settings.id).await.unwrap();
+ let fetched = repo.get_by_user(user_id).await.unwrap();
+ assert!(fetched.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_create_and_get_smtp_settings() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = SmtpSettingsRepository { db: &db };
+ let settings = repo.create(user_id, true, "test@example.com".to_string(), "smtp.example.com".to_string(), 587, Some("user".to_string()), Some("pass".to_string()), true, Some("from@example.com".to_string()), Some("Alerts".to_string()), Some("subj".to_string()), Some("body".to_string())).await.unwrap();
+ let fetched = repo.get_by_user(user_id).await.unwrap().unwrap();
+ assert_eq!(fetched.email, "test@example.com");
+ assert_eq!(fetched.smtp_server, "smtp.example.com");
+ assert_eq!(fetched.smtp_port, 587);
+ assert_eq!(fetched.username, Some("user".to_string()));
+ assert_eq!(fetched.password, Some("pass".to_string()));
+ assert_eq!(fetched.use_tls, true);
+ assert_eq!(fetched.from_email, Some("from@example.com".to_string()));
+ assert_eq!(fetched.from_name, Some("Alerts".to_string()));
+ assert_eq!(fetched.subject_template, Some("subj".to_string()));
+ assert_eq!(fetched.body_template, Some("body".to_string()));
+ }
+
+ #[tokio::test]
+ async fn test_update_smtp_settings() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = SmtpSettingsRepository { db: &db };
+ let settings = repo.create(user_id, true, "test@example.com".to_string(), "smtp.example.com".to_string(), 587, None, None, true, None, None, None, None).await.unwrap();
+ let updated = repo.update(settings.id, false, "other@example.com".to_string(), "smtp2.example.com".to_string(), 465, Some("u2".to_string()), Some("p2".to_string()), false, Some("f2@example.com".to_string()), Some("N2".to_string()), Some("s2".to_string()), Some("b2".to_string())).await.unwrap();
+ assert_eq!(updated.enabled, false);
+ assert_eq!(updated.email, "other@example.com");
+ assert_eq!(updated.smtp_server, "smtp2.example.com");
+ assert_eq!(updated.smtp_port, 465);
+ assert_eq!(updated.username, Some("u2".to_string()));
+ assert_eq!(updated.password, Some("p2".to_string()));
+ assert_eq!(updated.use_tls, false);
+ assert_eq!(updated.from_email, Some("f2@example.com".to_string()));
+ assert_eq!(updated.from_name, Some("N2".to_string()));
+ assert_eq!(updated.subject_template, Some("s2".to_string()));
+ assert_eq!(updated.body_template, Some("b2".to_string()));
+ }
+
+ #[tokio::test]
+ async fn test_delete_smtp_settings() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = SmtpSettingsRepository { db: &db };
+ let settings = repo.create(user_id, true, "test@example.com".to_string(), "smtp.example.com".to_string(), 587, None, None, true, None, None, None, None).await.unwrap();
+ repo.delete(settings.id).await.unwrap();
+ let fetched = repo.get_by_user(user_id).await.unwrap();
+ assert!(fetched.is_none());
+ }
+} \ No newline at end of file
diff --git a/src/users.rs b/src/users.rs
new file mode 100644
index 0000000..baca6dd
--- /dev/null
+++ b/src/users.rs
@@ -0,0 +1,139 @@
+use serde::{Deserialize, Serialize};
+use sqlx::FromRow;
+use uuid::Uuid;
+
+#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq, Eq)]
+pub struct User {
+ pub id: i64,
+ pub user_id: String, // API token
+ pub role: UserRole,
+}
+
+#[derive(Debug, Serialize, Deserialize, sqlx::Type, Clone, PartialEq, Eq)]
+#[sqlx(type_name = "TEXT")]
+pub enum UserRole {
+ #[serde(rename = "user")]
+ User,
+ #[serde(rename = "admin")]
+ Admin,
+}
+
+impl Default for UserRole {
+ fn default() -> Self {
+ UserRole::User
+ }
+}
+
+pub struct UserRepository<'a> {
+ pub db: &'a sqlx::SqlitePool,
+}
+
+impl<'a> UserRepository<'a> {
+ pub async fn list_users(&self) -> Result<Vec<User>, sqlx::Error> {
+ sqlx::query_as::<_, User>("SELECT id, user_id, role FROM users")
+ .fetch_all(self.db)
+ .await
+ }
+
+ pub async fn get_user_by_id(&self, id: i64) -> Result<Option<User>, sqlx::Error> {
+ sqlx::query_as::<_, User>("SELECT id, user_id, role FROM users WHERE id = ?")
+ .bind(id)
+ .fetch_optional(self.db)
+ .await
+ }
+
+ pub async fn get_user_by_user_id(&self, user_id: &str) -> Result<Option<User>, sqlx::Error> {
+ sqlx::query_as::<_, User>("SELECT id, user_id, role FROM users WHERE user_id = ?")
+ .bind(user_id)
+ .fetch_optional(self.db)
+ .await
+ }
+
+ pub async fn create_user(&self, user_id: Option<String>, role: Option<UserRole>) -> Result<User, sqlx::Error> {
+ let user_id = user_id.unwrap_or_else(|| Uuid::new_v4().to_string());
+ let role = role.unwrap_or_default();
+ sqlx::query_as::<_, User>(
+ "INSERT INTO users (user_id, role) VALUES (?, ?) RETURNING id, user_id, role"
+ )
+ .bind(user_id)
+ .bind(role)
+ .fetch_one(self.db)
+ .await
+ }
+
+ pub async fn update_user(&self, id: i64, role: UserRole) -> Result<User, sqlx::Error> {
+ sqlx::query_as::<_, User>(
+ "UPDATE users SET role = ? WHERE id = ? RETURNING id, user_id, role"
+ )
+ .bind(role)
+ .bind(id)
+ .fetch_one(self.db)
+ .await
+ }
+
+ pub async fn delete_user(&self, id: i64) -> Result<(), sqlx::Error> {
+ sqlx::query("DELETE FROM users WHERE id = ?")
+ .bind(id)
+ .execute(self.db)
+ .await?;
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use sqlx::{SqlitePool, Executor};
+ use tokio;
+
+ async fn setup_db() -> SqlitePool {
+ let pool = SqlitePool::connect(":memory:").await.unwrap();
+ pool.execute(
+ "CREATE TABLE users (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id TEXT NOT NULL UNIQUE,
+ role TEXT NOT NULL DEFAULT 'user'
+ );"
+ ).await.unwrap();
+ pool
+ }
+
+ #[tokio::test]
+ async fn test_create_and_get_user() {
+ let db = setup_db().await;
+ let repo = UserRepository { db: &db };
+ let user = repo.create_user(None, Some(UserRole::Admin)).await.unwrap();
+ assert_eq!(user.role, UserRole::Admin);
+ let fetched = repo.get_user_by_user_id(&user.user_id).await.unwrap().unwrap();
+ assert_eq!(fetched.user_id, user.user_id);
+ }
+
+ #[tokio::test]
+ async fn test_update_user() {
+ let db = setup_db().await;
+ let repo = UserRepository { db: &db };
+ let user = repo.create_user(None, Some(UserRole::User)).await.unwrap();
+ let updated = repo.update_user(user.id, UserRole::Admin).await.unwrap();
+ assert_eq!(updated.role, UserRole::Admin);
+ }
+
+ #[tokio::test]
+ async fn test_delete_user() {
+ let db = setup_db().await;
+ let repo = UserRepository { db: &db };
+ let user = repo.create_user(None, None).await.unwrap();
+ repo.delete_user(user.id).await.unwrap();
+ let fetched = repo.get_user_by_id(user.id).await.unwrap();
+ assert!(fetched.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_list_users() {
+ let db = setup_db().await;
+ let repo = UserRepository { db: &db };
+ repo.create_user(None, Some(UserRole::User)).await.unwrap();
+ repo.create_user(None, Some(UserRole::Admin)).await.unwrap();
+ let users = repo.list_users().await.unwrap();
+ assert_eq!(users.len(), 2);
+ }
+} \ No newline at end of file
diff --git a/src/weather_poller.rs b/src/weather_poller.rs
new file mode 100644
index 0000000..056cef8
--- /dev/null
+++ b/src/weather_poller.rs
@@ -0,0 +1,192 @@
+use crate::users::UserRepository;
+use crate::locations::LocationRepository;
+use crate::weather_thresholds::{WeatherThresholdRepository, WeatherThreshold};
+use crate::notifications::{NtfySettingsRepository, SmtpSettingsRepository, NtfySettings, SmtpSettings};
+use serde_json::Value;
+use tera::{Tera, Context};
+use reqwest::Client;
+use std::sync::{Arc, Mutex};
+use tokio_task_scheduler::{Scheduler, Task};
+use tokio::time::Duration;
+
+const OWM_API_URL: &str = "https://api.openweathermap.org/data/2.5/forecast";
+const ALERT_WINDOW_HOURS: i64 = 24;
+
+pub struct WeatherPoller {
+ pub db: Arc<sqlx::SqlitePool>,
+ pub owm_api_key: String,
+ pub tera: Arc<Mutex<Tera>>,
+}
+
+impl WeatherPoller {
+ pub async fn check_all(&self) {
+ let user_repo = UserRepository { db: &self.db };
+ let loc_repo = LocationRepository { db: &self.db };
+ let users = user_repo.list_users().await.unwrap_or_default();
+ for user in users {
+ if let Some(location) = loc_repo.list_locations().await.unwrap_or_default().into_iter().find(|l| l.user_id == user.id) {
+ self.check_user_weather(user.id, location.latitude, location.longitude).await;
+ }
+ }
+ }
+
+ pub async fn check_user_weather(&self, user_id: i64, lat: f64, lon: f64) {
+ if let Ok(Some(forecast)) = self.fetch_forecast(lat, lon).await {
+ let threshold_repo = WeatherThresholdRepository { db: &self.db };
+ let thresholds = threshold_repo.list_thresholds(user_id).await.unwrap_or_default().into_iter().filter(|t| t.enabled).collect::<Vec<_>>();
+ if let Some(entry) = find_first_alert_entry(&forecast, &thresholds) {
+ self.send_notifications(user_id, &entry).await;
+ }
+ }
+ }
+
+ pub async fn fetch_forecast(&self, lat: f64, lon: f64) -> Result<Option<Vec<Value>>, reqwest::Error> {
+ let client = Client::new();
+ let resp = client.get(OWM_API_URL)
+ .query(&[
+ ("lat", lat.to_string()),
+ ("lon", lon.to_string()),
+ ("units", "metric".to_string()),
+ ("appid", self.owm_api_key.clone()),
+ ])
+ .send()
+ .await?;
+ let json: Value = resp.json().await?;
+ Ok(json["list"].as_array().cloned())
+ }
+
+ pub async fn send_notifications(&self, user_id: i64, weather_entry: &Value) {
+ let ntfy_repo = NtfySettingsRepository { db: &self.db };
+ let smtp_repo = SmtpSettingsRepository { db: &self.db };
+ let tera = self.tera.clone();
+ if let Some(ntfy) = ntfy_repo.get_by_user(user_id).await.unwrap_or(None) {
+ if ntfy.enabled {
+ send_ntfy_notification(&ntfy, weather_entry, tera.clone()).await;
+ }
+ }
+ if let Some(smtp) = smtp_repo.get_by_user(user_id).await.unwrap_or(None) {
+ if smtp.enabled {
+ send_smtp_notification(&smtp, weather_entry, tera.clone()).await;
+ }
+ }
+ }
+}
+
+fn find_first_alert_entry(forecast: &[Value], thresholds: &[WeatherThreshold]) -> Option<Value> {
+ use chrono::{Utc, TimeZone};
+ let now = Utc::now();
+ for entry in forecast {
+ if let Some(ts) = entry["dt"].as_i64() {
+ let forecast_time = Utc.timestamp_opt(ts, 0).single()?;
+ if (forecast_time - now).num_hours() > ALERT_WINDOW_HOURS {
+ break;
+ }
+ if thresholds.iter().any(|t| threshold_triggered(t, entry)) {
+ return Some(entry.clone());
+ }
+ }
+ }
+ None
+}
+
+fn threshold_triggered(threshold: &WeatherThreshold, entry: &Value) -> bool {
+ let value = match threshold.condition_type.as_str() {
+ "wind_speed" => entry.pointer("/wind/speed").and_then(|v| v.as_f64()).unwrap_or(0.0) * 3.6,
+ "rain" => entry.pointer("/rain/3h").and_then(|v| v.as_f64()).unwrap_or(0.0),
+ "temp_min" | "temp_max" => entry.pointer("/main/temp").and_then(|v| v.as_f64()).unwrap_or(0.0),
+ _ => return false,
+ };
+ compare(value, &threshold.operator, threshold.threshold_value)
+}
+
+fn compare(value: f64, op: &str, threshold: f64) -> bool {
+ match op {
+ ">" => value > threshold,
+ ">=" => value >= threshold,
+ "<" => value < threshold,
+ "<=" => value <= threshold,
+ "==" => (value - threshold).abs() < std::f64::EPSILON,
+ _ => false,
+ }
+}
+
+async fn send_ntfy_notification(ntfy: &NtfySettings, weather_entry: &Value, tera: Arc<Mutex<Tera>>) {
+ let mut ctx = Context::new();
+ add_weather_context(&mut ctx, weather_entry);
+ let title = if let Some(tpl) = &ntfy.title_template {
+ let mut tera = tera.lock().unwrap();
+ tera.render_str(tpl, &ctx).unwrap_or_else(|_| "🚨 Weather Alert".to_string())
+ } else {
+ "🚨 Weather Alert".to_string()
+ };
+ let message = if let Some(tpl) = &ntfy.message_template {
+ let mut tera = tera.lock().unwrap();
+ tera.render_str(tpl, &ctx).unwrap_or_else(|_| "🚨 Weather alert for your location".to_string())
+ } else {
+ default_weather_message(weather_entry)
+ };
+ let client = Client::new();
+ let _ = client.post(&format!("{}/{}", ntfy.server_url, ntfy.topic))
+ .header("Priority", ntfy.priority.to_string())
+ .header("Title", title)
+ .body(message)
+ .send()
+ .await;
+}
+
+async fn send_smtp_notification(smtp: &SmtpSettings, weather_entry: &Value, tera: Arc<Mutex<Tera>>) {
+ use lettre::{Message, SmtpTransport, Transport, transport::smtp::authentication::Credentials};
+ let mut ctx = Context::new();
+ add_weather_context(&mut ctx, weather_entry);
+ let subject = if let Some(tpl) = &smtp.subject_template {
+ let mut tera = tera.lock().unwrap();
+ tera.render_str(tpl, &ctx).unwrap_or_else(|_| "⚠️ Weather Alert for Your Location".to_string())
+ } else {
+ "⚠️ Weather Alert for Your Location".to_string()
+ };
+ let body = if let Some(tpl) = &smtp.body_template {
+ let mut tera = tera.lock().unwrap();
+ tera.render_str(tpl, &ctx).unwrap_or_else(|_| default_weather_message(weather_entry))
+ } else {
+ default_weather_message(weather_entry)
+ };
+ let from = smtp.from_email.clone().unwrap_or_else(|| smtp.email.clone());
+ let from_name = smtp.from_name.clone().unwrap_or_else(|| "Silmätaivas Alerts".to_string());
+ let email = Message::builder()
+ .from(format!("{} <{}>", from_name, from).parse().unwrap())
+ .to(smtp.email.parse().unwrap())
+ .subject(subject)
+ .body(body)
+ .unwrap();
+ let creds = smtp.username.as_ref().and_then(|u| smtp.password.as_ref().map(|p| Credentials::new(u.clone(), p.clone())));
+ let mailer = if let Some(creds) = creds {
+ SmtpTransport::relay(&smtp.smtp_server).unwrap()
+ .port(smtp.smtp_port as u16)
+ .credentials(creds)
+ .build()
+ } else {
+ SmtpTransport::relay(&smtp.smtp_server).unwrap()
+ .port(smtp.smtp_port as u16)
+ .build()
+ };
+ let _ = mailer.send(&email);
+}
+
+fn add_weather_context(ctx: &mut Context, entry: &Value) {
+ ctx.insert("temp", &entry.pointer("/main/temp").and_then(|v| v.as_f64()).unwrap_or(0.0));
+ ctx.insert("wind_speed", &(entry.pointer("/wind/speed").and_then(|v| v.as_f64()).unwrap_or(0.0) * 3.6));
+ ctx.insert("rain", &entry.pointer("/rain/3h").and_then(|v| v.as_f64()).unwrap_or(0.0));
+ ctx.insert("time", &entry["dt_txt"].as_str().unwrap_or("N/A"));
+ ctx.insert("humidity", &entry.pointer("/main/humidity").and_then(|v| v.as_f64()).unwrap_or(0.0));
+ ctx.insert("pressure", &entry.pointer("/main/pressure").and_then(|v| v.as_f64()).unwrap_or(0.0));
+}
+
+fn default_weather_message(entry: &Value) -> String {
+ let temp = entry.pointer("/main/temp").and_then(|v| v.as_f64()).unwrap_or(0.0);
+ let wind = entry.pointer("/wind/speed").and_then(|v| v.as_f64()).unwrap_or(0.0) * 3.6;
+ let rain = entry.pointer("/rain/3h").and_then(|v| v.as_f64()).unwrap_or(0.0);
+ let time = entry["dt_txt"].as_str().unwrap_or("N/A");
+ format!("🚨 Weather alert for your location ({}):\n\n🌬️ Wind: {:.1} km/h\n🌧️ Rain: {:.1} mm\n🌡️ Temperature: {:.1} °C\n\nStay safe,\n— Silmätaivas", time, wind, rain, temp)
+}
+
+// Unit tests for threshold logic and template rendering can be added here. \ No newline at end of file
diff --git a/src/weather_thresholds.rs b/src/weather_thresholds.rs
new file mode 100644
index 0000000..bfb9cdf
--- /dev/null
+++ b/src/weather_thresholds.rs
@@ -0,0 +1,165 @@
+use serde::{Deserialize, Serialize};
+use sqlx::FromRow;
+
+#[derive(Debug, Serialize, Deserialize, FromRow, Clone, PartialEq)]
+pub struct WeatherThreshold {
+ pub id: i64,
+ pub user_id: i64,
+ pub condition_type: String,
+ pub threshold_value: f64,
+ pub operator: String,
+ pub enabled: bool,
+ pub description: Option<String>,
+}
+
+pub struct WeatherThresholdRepository<'a> {
+ pub db: &'a sqlx::SqlitePool,
+}
+
+impl<'a> WeatherThresholdRepository<'a> {
+ pub async fn list_thresholds(&self, user_id: i64) -> Result<Vec<WeatherThreshold>, sqlx::Error> {
+ sqlx::query_as::<_, WeatherThreshold>(
+ "SELECT id, user_id, condition_type, threshold_value, operator, enabled, description FROM weather_thresholds WHERE user_id = ?"
+ )
+ .bind(user_id)
+ .fetch_all(self.db)
+ .await
+ }
+
+ pub async fn get_threshold(&self, id: i64, user_id: i64) -> Result<Option<WeatherThreshold>, sqlx::Error> {
+ sqlx::query_as::<_, WeatherThreshold>(
+ "SELECT id, user_id, condition_type, threshold_value, operator, enabled, description FROM weather_thresholds WHERE id = ? AND user_id = ?"
+ )
+ .bind(id)
+ .bind(user_id)
+ .fetch_optional(self.db)
+ .await
+ }
+
+ pub async fn create_threshold(&self, user_id: i64, condition_type: String, threshold_value: f64, operator: String, enabled: bool, description: Option<String>) -> Result<WeatherThreshold, sqlx::Error> {
+ sqlx::query_as::<_, WeatherThreshold>(
+ "INSERT INTO weather_thresholds (user_id, condition_type, threshold_value, operator, enabled, description) VALUES (?, ?, ?, ?, ?, ?) RETURNING id, user_id, condition_type, threshold_value, operator, enabled, description"
+ )
+ .bind(user_id)
+ .bind(condition_type)
+ .bind(threshold_value)
+ .bind(operator)
+ .bind(enabled)
+ .bind(description)
+ .fetch_one(self.db)
+ .await
+ }
+
+ pub async fn update_threshold(&self, id: i64, user_id: i64, condition_type: String, threshold_value: f64, operator: String, enabled: bool, description: Option<String>) -> Result<WeatherThreshold, sqlx::Error> {
+ sqlx::query_as::<_, WeatherThreshold>(
+ "UPDATE weather_thresholds SET condition_type = ?, threshold_value = ?, operator = ?, enabled = ?, description = ? WHERE id = ? AND user_id = ? RETURNING id, user_id, condition_type, threshold_value, operator, enabled, description"
+ )
+ .bind(condition_type)
+ .bind(threshold_value)
+ .bind(operator)
+ .bind(enabled)
+ .bind(description)
+ .bind(id)
+ .bind(user_id)
+ .fetch_one(self.db)
+ .await
+ }
+
+ pub async fn delete_threshold(&self, id: i64, user_id: i64) -> Result<(), sqlx::Error> {
+ sqlx::query("DELETE FROM weather_thresholds WHERE id = ? AND user_id = ?")
+ .bind(id)
+ .bind(user_id)
+ .execute(self.db)
+ .await?;
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::users::{UserRepository, UserRole};
+ use sqlx::{SqlitePool, Executor};
+ use tokio;
+
+ async fn setup_db() -> SqlitePool {
+ let pool = SqlitePool::connect(":memory:").await.unwrap();
+ pool.execute(
+ "CREATE TABLE users (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id TEXT NOT NULL UNIQUE,
+ role TEXT NOT NULL DEFAULT 'user'
+ );"
+ ).await.unwrap();
+ pool.execute(
+ "CREATE TABLE weather_thresholds (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ user_id INTEGER NOT NULL,
+ condition_type TEXT NOT NULL,
+ threshold_value REAL NOT NULL,
+ operator TEXT NOT NULL,
+ enabled BOOLEAN NOT NULL DEFAULT 1,
+ description TEXT,
+ FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
+ );"
+ ).await.unwrap();
+ pool
+ }
+
+ async fn create_user(pool: &SqlitePool) -> i64 {
+ let repo = UserRepository { db: pool };
+ let user = repo.create_user(None, Some(UserRole::User)).await.unwrap();
+ user.id
+ }
+
+ #[tokio::test]
+ async fn test_create_and_get_threshold() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = WeatherThresholdRepository { db: &db };
+ let th = repo.create_threshold(user_id, "wind_speed".to_string(), 10.0, ">".to_string(), true, Some("desc".to_string())).await.unwrap();
+ let fetched = repo.get_threshold(th.id, user_id).await.unwrap().unwrap();
+ assert_eq!(fetched.condition_type, "wind_speed");
+ assert_eq!(fetched.threshold_value, 10.0);
+ assert_eq!(fetched.operator, ">"
+ );
+ assert_eq!(fetched.enabled, true);
+ assert_eq!(fetched.description, Some("desc".to_string()));
+ }
+
+ #[tokio::test]
+ async fn test_update_threshold() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = WeatherThresholdRepository { db: &db };
+ let th = repo.create_threshold(user_id, "wind_speed".to_string(), 10.0, ">".to_string(), true, None).await.unwrap();
+ let updated = repo.update_threshold(th.id, user_id, "rain".to_string(), 5.0, "<".to_string(), false, Some("rain desc".to_string())).await.unwrap();
+ assert_eq!(updated.condition_type, "rain");
+ assert_eq!(updated.threshold_value, 5.0);
+ assert_eq!(updated.operator, "<");
+ assert_eq!(updated.enabled, false);
+ assert_eq!(updated.description, Some("rain desc".to_string()));
+ }
+
+ #[tokio::test]
+ async fn test_delete_threshold() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = WeatherThresholdRepository { db: &db };
+ let th = repo.create_threshold(user_id, "wind_speed".to_string(), 10.0, ">".to_string(), true, None).await.unwrap();
+ repo.delete_threshold(th.id, user_id).await.unwrap();
+ let fetched = repo.get_threshold(th.id, user_id).await.unwrap();
+ assert!(fetched.is_none());
+ }
+
+ #[tokio::test]
+ async fn test_list_thresholds() {
+ let db = setup_db().await;
+ let user_id = create_user(&db).await;
+ let repo = WeatherThresholdRepository { db: &db };
+ repo.create_threshold(user_id, "wind_speed".to_string(), 10.0, ">".to_string(), true, None).await.unwrap();
+ repo.create_threshold(user_id, "rain".to_string(), 5.0, "<".to_string(), false, None).await.unwrap();
+ let thresholds = repo.list_thresholds(user_id).await.unwrap();
+ assert_eq!(thresholds.len(), 2);
+ }
+} \ No newline at end of file