//! Polling manager for periodic repository change detection. //! //! Spawns background tasks for sites with `poll_interval` configured. //! Integrates with SIGHUP reload to restart polling tasks on config change. use crate::build_guard::BuildGuard; use crate::config::SiteConfig; use crate::git; use crate::server::AppState; use log::{debug, error, info}; use std::collections::HashMap; use std::hash::{Hash as _, Hasher as _}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{RwLock, watch}; /// Manages polling tasks for all sites. pub struct PollingManager { /// Map of `site_name` -> cancellation sender for active polling tasks tasks: Arc>>>, } impl PollingManager { #[must_use] pub fn new() -> Self { Self { tasks: Arc::new(RwLock::new(HashMap::new())), } } /// Start polling tasks for sites with `poll_interval` configured. /// Call this on startup and after SIGHUP reload. /// /// # Panics /// /// Panics if the config `RwLock` is poisoned. pub async fn start_polling(&self, state: AppState) { let sites: Vec<_> = state .config .read() .expect("config lock poisoned") .sites .clone(); for site in &sites { if let Some(interval) = site.poll_interval { self.spawn_poll_task(state.clone(), site.clone(), interval) .await; } } } /// Stop all currently running polling tasks. /// Call this before starting new tasks on SIGHUP. pub async fn stop_all(&self) { let mut tasks = self.tasks.write().await; for (site_name, tx) in tasks.drain() { info!("[{site_name}] stopping polling task"); let _ = tx.send(()); } } /// Spawn a single polling task for a site. async fn spawn_poll_task(&self, state: AppState, site: SiteConfig, interval: Duration) { let site_name = site.name.clone(); let (cancel_tx, cancel_rx) = watch::channel(()); // Store the cancellation sender { let mut tasks = self.tasks.write().await; tasks.insert(site_name.clone(), cancel_tx); } info!( "[{site_name}] starting polling task: interval_secs={}", interval.as_secs() ); // Spawn the polling loop let tasks = Arc::clone(&self.tasks); tokio::spawn(async move { #[allow(clippy::large_futures)] poll_loop(state, site, interval, cancel_rx).await; // Remove from active tasks when done tasks.write().await.remove(&site_name); debug!("[{site_name}] polling task ended"); }); } } impl Default for PollingManager { fn default() -> Self { Self::new() } } /// The main polling loop for a single site. async fn poll_loop( state: AppState, site: SiteConfig, interval: Duration, mut cancel_rx: watch::Receiver<()>, ) { let site_name = &site.name; // Initial delay before first poll (avoid thundering herd on startup) let initial_delay = calculate_initial_delay(site_name, interval); debug!( "[{site_name}] initial poll delay: {} secs", initial_delay.as_secs() ); tokio::select! { () = tokio::time::sleep(initial_delay) => {} _ = cancel_rx.changed() => return, } loop { debug!("[{site_name}] polling for changes"); // 1. Acquire build lock before any git operation let Some(guard) = BuildGuard::try_acquire(site_name.clone(), &state.build_scheduler) else { debug!("[{site_name}] build in progress, skipping poll cycle"); tokio::select! { () = tokio::time::sleep(interval) => {} _ = cancel_rx.changed() => { info!("[{site_name}] polling cancelled"); return; } } continue; }; // Get current config (might have changed via SIGHUP) let (base_dir, git_timeout) = { let config = state.config.read().expect("config lock poisoned"); ( config.base_dir.clone(), config.git_timeout.unwrap_or(git::GIT_TIMEOUT_DEFAULT), ) }; let clone_dir = base_dir.join("clones").join(site_name); // 2. Check for changes (guard held — no concurrent git ops possible) let has_changes = match git::has_remote_changes( &clone_dir, &site.branch, git_timeout, site.git_depth.unwrap_or(git::GIT_DEPTH_DEFAULT), ) .await { Ok(changed) => changed, Err(e) => { error!("[{site_name}] failed to check for changes: {e}"); false } }; if has_changes { // 3a. Keep guard alive — move into build pipeline info!("[{site_name}] new commits detected, triggering build"); #[allow(clippy::large_futures)] crate::server::run_build_pipeline( state.clone(), site_name.clone(), site.clone(), guard, ) .await; } else { // 3b. Explicit drop BEFORE sleep — release lock immediately drop(guard); } // 4. Sleep (lock is NOT held here in either branch) tokio::select! { () = tokio::time::sleep(interval) => {} _ = cancel_rx.changed() => { info!("[{site_name}] polling cancelled"); return; } } } } /// Calculate staggered initial delay to avoid all sites polling at once. /// Uses a simple hash of the site name to distribute start times. fn calculate_initial_delay(site_name: &str, interval: Duration) -> Duration { use std::collections::hash_map::DefaultHasher; let mut hasher = DefaultHasher::new(); site_name.hash(&mut hasher); let hash = hasher.finish(); // Spread across 0 to interval/2 let max_delay_secs = interval.as_secs() / 2; let delay_secs = if max_delay_secs > 0 { hash % max_delay_secs } else { 0 }; Duration::from_secs(delay_secs) } #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { use super::*; #[test] fn initial_delay_zero_interval() { // interval=0 → max_delay_secs=0 → delay=0 let delay = calculate_initial_delay("site", Duration::from_secs(0)); assert_eq!(delay, Duration::from_secs(0)); } #[test] fn initial_delay_one_second_interval() { // interval=1s → max_delay_secs=0 → delay=0 let delay = calculate_initial_delay("site", Duration::from_secs(1)); assert_eq!(delay, Duration::from_secs(0)); } #[test] fn initial_delay_within_half_interval() { let interval = Duration::from_secs(600); // 10 min let delay = calculate_initial_delay("my-site", interval); // Must be < interval/2 (300s) assert!(delay < Duration::from_secs(300)); } #[test] fn initial_delay_deterministic() { let interval = Duration::from_secs(600); let d1 = calculate_initial_delay("my-site", interval); let d2 = calculate_initial_delay("my-site", interval); assert_eq!(d1, d2); } #[test] fn initial_delay_different_sites_differ() { let interval = Duration::from_secs(3600); let d1 = calculate_initial_delay("site-alpha", interval); let d2 = calculate_initial_delay("site-beta", interval); // Different names should (almost certainly) produce different delays assert_ne!(d1, d2); } }