From 50ce8cb96b2b218751c2fc2a6b19372f51846acc Mon Sep 17 00:00:00 2001 From: Dawid Rycerz Date: Mon, 14 Jul 2025 19:34:59 +0300 Subject: feat: rewrite in rust --- src/weather_poller.rs | 192 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 192 insertions(+) create mode 100644 src/weather_poller.rs (limited to 'src/weather_poller.rs') diff --git a/src/weather_poller.rs b/src/weather_poller.rs new file mode 100644 index 0000000..056cef8 --- /dev/null +++ b/src/weather_poller.rs @@ -0,0 +1,192 @@ +use crate::users::UserRepository; +use crate::locations::LocationRepository; +use crate::weather_thresholds::{WeatherThresholdRepository, WeatherThreshold}; +use crate::notifications::{NtfySettingsRepository, SmtpSettingsRepository, NtfySettings, SmtpSettings}; +use serde_json::Value; +use tera::{Tera, Context}; +use reqwest::Client; +use std::sync::{Arc, Mutex}; +use tokio_task_scheduler::{Scheduler, Task}; +use tokio::time::Duration; + +const OWM_API_URL: &str = "https://api.openweathermap.org/data/2.5/forecast"; +const ALERT_WINDOW_HOURS: i64 = 24; + +pub struct WeatherPoller { + pub db: Arc, + pub owm_api_key: String, + pub tera: Arc>, +} + +impl WeatherPoller { + pub async fn check_all(&self) { + let user_repo = UserRepository { db: &self.db }; + let loc_repo = LocationRepository { db: &self.db }; + let users = user_repo.list_users().await.unwrap_or_default(); + for user in users { + if let Some(location) = loc_repo.list_locations().await.unwrap_or_default().into_iter().find(|l| l.user_id == user.id) { + self.check_user_weather(user.id, location.latitude, location.longitude).await; + } + } + } + + pub async fn check_user_weather(&self, user_id: i64, lat: f64, lon: f64) { + if let Ok(Some(forecast)) = self.fetch_forecast(lat, lon).await { + let threshold_repo = WeatherThresholdRepository { db: &self.db }; + let thresholds = threshold_repo.list_thresholds(user_id).await.unwrap_or_default().into_iter().filter(|t| t.enabled).collect::>(); + if let Some(entry) = find_first_alert_entry(&forecast, &thresholds) { + self.send_notifications(user_id, &entry).await; + } + } + } + + pub async fn fetch_forecast(&self, lat: f64, lon: f64) -> Result>, reqwest::Error> { + let client = Client::new(); + let resp = client.get(OWM_API_URL) + .query(&[ + ("lat", lat.to_string()), + ("lon", lon.to_string()), + ("units", "metric".to_string()), + ("appid", self.owm_api_key.clone()), + ]) + .send() + .await?; + let json: Value = resp.json().await?; + Ok(json["list"].as_array().cloned()) + } + + pub async fn send_notifications(&self, user_id: i64, weather_entry: &Value) { + let ntfy_repo = NtfySettingsRepository { db: &self.db }; + let smtp_repo = SmtpSettingsRepository { db: &self.db }; + let tera = self.tera.clone(); + if let Some(ntfy) = ntfy_repo.get_by_user(user_id).await.unwrap_or(None) { + if ntfy.enabled { + send_ntfy_notification(&ntfy, weather_entry, tera.clone()).await; + } + } + if let Some(smtp) = smtp_repo.get_by_user(user_id).await.unwrap_or(None) { + if smtp.enabled { + send_smtp_notification(&smtp, weather_entry, tera.clone()).await; + } + } + } +} + +fn find_first_alert_entry(forecast: &[Value], thresholds: &[WeatherThreshold]) -> Option { + use chrono::{Utc, TimeZone}; + let now = Utc::now(); + for entry in forecast { + if let Some(ts) = entry["dt"].as_i64() { + let forecast_time = Utc.timestamp_opt(ts, 0).single()?; + if (forecast_time - now).num_hours() > ALERT_WINDOW_HOURS { + break; + } + if thresholds.iter().any(|t| threshold_triggered(t, entry)) { + return Some(entry.clone()); + } + } + } + None +} + +fn threshold_triggered(threshold: &WeatherThreshold, entry: &Value) -> bool { + let value = match threshold.condition_type.as_str() { + "wind_speed" => entry.pointer("/wind/speed").and_then(|v| v.as_f64()).unwrap_or(0.0) * 3.6, + "rain" => entry.pointer("/rain/3h").and_then(|v| v.as_f64()).unwrap_or(0.0), + "temp_min" | "temp_max" => entry.pointer("/main/temp").and_then(|v| v.as_f64()).unwrap_or(0.0), + _ => return false, + }; + compare(value, &threshold.operator, threshold.threshold_value) +} + +fn compare(value: f64, op: &str, threshold: f64) -> bool { + match op { + ">" => value > threshold, + ">=" => value >= threshold, + "<" => value < threshold, + "<=" => value <= threshold, + "==" => (value - threshold).abs() < std::f64::EPSILON, + _ => false, + } +} + +async fn send_ntfy_notification(ntfy: &NtfySettings, weather_entry: &Value, tera: Arc>) { + let mut ctx = Context::new(); + add_weather_context(&mut ctx, weather_entry); + let title = if let Some(tpl) = &ntfy.title_template { + let mut tera = tera.lock().unwrap(); + tera.render_str(tpl, &ctx).unwrap_or_else(|_| "🚨 Weather Alert".to_string()) + } else { + "🚨 Weather Alert".to_string() + }; + let message = if let Some(tpl) = &ntfy.message_template { + let mut tera = tera.lock().unwrap(); + tera.render_str(tpl, &ctx).unwrap_or_else(|_| "🚨 Weather alert for your location".to_string()) + } else { + default_weather_message(weather_entry) + }; + let client = Client::new(); + let _ = client.post(&format!("{}/{}", ntfy.server_url, ntfy.topic)) + .header("Priority", ntfy.priority.to_string()) + .header("Title", title) + .body(message) + .send() + .await; +} + +async fn send_smtp_notification(smtp: &SmtpSettings, weather_entry: &Value, tera: Arc>) { + use lettre::{Message, SmtpTransport, Transport, transport::smtp::authentication::Credentials}; + let mut ctx = Context::new(); + add_weather_context(&mut ctx, weather_entry); + let subject = if let Some(tpl) = &smtp.subject_template { + let mut tera = tera.lock().unwrap(); + tera.render_str(tpl, &ctx).unwrap_or_else(|_| "⚠️ Weather Alert for Your Location".to_string()) + } else { + "⚠️ Weather Alert for Your Location".to_string() + }; + let body = if let Some(tpl) = &smtp.body_template { + let mut tera = tera.lock().unwrap(); + tera.render_str(tpl, &ctx).unwrap_or_else(|_| default_weather_message(weather_entry)) + } else { + default_weather_message(weather_entry) + }; + let from = smtp.from_email.clone().unwrap_or_else(|| smtp.email.clone()); + let from_name = smtp.from_name.clone().unwrap_or_else(|| "Silmätaivas Alerts".to_string()); + let email = Message::builder() + .from(format!("{} <{}>", from_name, from).parse().unwrap()) + .to(smtp.email.parse().unwrap()) + .subject(subject) + .body(body) + .unwrap(); + let creds = smtp.username.as_ref().and_then(|u| smtp.password.as_ref().map(|p| Credentials::new(u.clone(), p.clone()))); + let mailer = if let Some(creds) = creds { + SmtpTransport::relay(&smtp.smtp_server).unwrap() + .port(smtp.smtp_port as u16) + .credentials(creds) + .build() + } else { + SmtpTransport::relay(&smtp.smtp_server).unwrap() + .port(smtp.smtp_port as u16) + .build() + }; + let _ = mailer.send(&email); +} + +fn add_weather_context(ctx: &mut Context, entry: &Value) { + ctx.insert("temp", &entry.pointer("/main/temp").and_then(|v| v.as_f64()).unwrap_or(0.0)); + ctx.insert("wind_speed", &(entry.pointer("/wind/speed").and_then(|v| v.as_f64()).unwrap_or(0.0) * 3.6)); + ctx.insert("rain", &entry.pointer("/rain/3h").and_then(|v| v.as_f64()).unwrap_or(0.0)); + ctx.insert("time", &entry["dt_txt"].as_str().unwrap_or("N/A")); + ctx.insert("humidity", &entry.pointer("/main/humidity").and_then(|v| v.as_f64()).unwrap_or(0.0)); + ctx.insert("pressure", &entry.pointer("/main/pressure").and_then(|v| v.as_f64()).unwrap_or(0.0)); +} + +fn default_weather_message(entry: &Value) -> String { + let temp = entry.pointer("/main/temp").and_then(|v| v.as_f64()).unwrap_or(0.0); + let wind = entry.pointer("/wind/speed").and_then(|v| v.as_f64()).unwrap_or(0.0) * 3.6; + let rain = entry.pointer("/rain/3h").and_then(|v| v.as_f64()).unwrap_or(0.0); + let time = entry["dt_txt"].as_str().unwrap_or("N/A"); + format!("🚨 Weather alert for your location ({}):\n\n🌬️ Wind: {:.1} km/h\n🌧️ Rain: {:.1} mm\n🌡️ Temperature: {:.1} °C\n\nStay safe,\n— Silmätaivas", time, wind, rain, temp) +} + +// Unit tests for threshold logic and template rendering can be added here. \ No newline at end of file -- cgit v1.2.3