//! Polling manager for periodic repository change detection. //! //! Spawns background tasks for sites with `poll_interval` configured. //! Integrates with SIGHUP reload to restart polling tasks on config change. use crate::build_guard::BuildGuard; use crate::config::SiteConfig; use crate::git; use crate::server::AppState; use std::collections::HashMap; use std::hash::{Hash as _, Hasher as _}; use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; /// Manages polling tasks for all sites. pub struct PollingManager { /// Map of `site_name` -> cancellation token for active polling tasks tasks: Arc>>, } impl PollingManager { #[must_use] pub fn new() -> Self { Self { tasks: Arc::new(RwLock::new(HashMap::new())), } } /// Start polling tasks for sites with `poll_interval` configured. /// Call this on startup and after SIGHUP reload. pub async fn start_polling(&self, state: AppState) { let config = state.config.read().await; for site in &config.sites { if let Some(interval) = site.poll_interval { self.spawn_poll_task(state.clone(), site.clone(), interval) .await; } } } /// Stop all currently running polling tasks. /// Call this before starting new tasks on SIGHUP. pub async fn stop_all(&self) { let mut tasks = self.tasks.write().await; for (site_name, token) in tasks.drain() { info!(site = %site_name, "stopping polling task"); token.cancel(); } } /// Spawn a single polling task for a site. async fn spawn_poll_task(&self, state: AppState, site: SiteConfig, interval: Duration) { let site_name = site.name.clone(); let token = CancellationToken::new(); // Store the cancellation token { let mut tasks = self.tasks.write().await; tasks.insert(site_name.clone(), token.clone()); } info!( site = %site_name, interval_secs = interval.as_secs(), "starting polling task" ); // Spawn the polling loop let tasks = Arc::clone(&self.tasks); tokio::spawn(async move { #[allow(clippy::large_futures)] poll_loop(state, site, interval, token.clone()).await; // Remove from active tasks when done tasks.write().await.remove(&site_name); debug!(site = %site_name, "polling task ended"); }); } } impl Default for PollingManager { fn default() -> Self { Self::new() } } /// The main polling loop for a single site. async fn poll_loop( state: AppState, site: SiteConfig, interval: Duration, cancel_token: CancellationToken, ) { let site_name = &site.name; // Initial delay before first poll (avoid thundering herd on startup) let initial_delay = calculate_initial_delay(site_name, interval); debug!(site = %site_name, delay_secs = initial_delay.as_secs(), "initial poll delay"); tokio::select! { () = tokio::time::sleep(initial_delay) => {} () = cancel_token.cancelled() => return, } loop { debug!(site = %site_name, "polling for changes"); // 1. Acquire build lock before any git operation let Some(guard) = BuildGuard::try_acquire(site_name.clone(), &state.build_scheduler) else { debug!(site = %site_name, "build in progress, skipping poll cycle"); tokio::select! { () = tokio::time::sleep(interval) => {} () = cancel_token.cancelled() => { info!(site = %site_name, "polling cancelled"); return; } } continue; }; // Get current config (might have changed via SIGHUP) let (base_dir, git_timeout) = { let config = state.config.read().await; ( config.base_dir.clone(), config.git_timeout.unwrap_or(git::GIT_TIMEOUT_DEFAULT), ) }; let clone_dir = base_dir.join("clones").join(site_name); // 2. Check for changes (guard held — no concurrent git ops possible) let has_changes = match git::has_remote_changes( &clone_dir, &site.branch, git_timeout, site.git_depth.unwrap_or(git::GIT_DEPTH_DEFAULT), ) .await { Ok(changed) => changed, Err(e) => { error!(site = %site_name, error = %e, "failed to check for changes"); false } }; if has_changes { // 3a. Keep guard alive — move into build pipeline info!(site = %site_name, "new commits detected, triggering build"); #[allow(clippy::large_futures)] crate::server::run_build_pipeline( state.clone(), site_name.clone(), site.clone(), guard, ) .await; } else { // 3b. Explicit drop BEFORE sleep — release lock immediately drop(guard); } // 4. Sleep (lock is NOT held here in either branch) tokio::select! { () = tokio::time::sleep(interval) => {} () = cancel_token.cancelled() => { info!(site = %site_name, "polling cancelled"); return; } } } } /// Calculate staggered initial delay to avoid all sites polling at once. /// Uses a simple hash of the site name to distribute start times. fn calculate_initial_delay(site_name: &str, interval: Duration) -> Duration { use std::collections::hash_map::DefaultHasher; let mut hasher = DefaultHasher::new(); site_name.hash(&mut hasher); let hash = hasher.finish(); // Spread across 0 to interval/2 let max_delay_secs = interval.as_secs() / 2; let delay_secs = if max_delay_secs > 0 { hash % max_delay_secs } else { 0 }; Duration::from_secs(delay_secs) } #[cfg(test)] #[allow(clippy::unwrap_used)] mod tests { use super::*; #[test] fn initial_delay_zero_interval() { // interval=0 → max_delay_secs=0 → delay=0 let delay = calculate_initial_delay("site", Duration::from_secs(0)); assert_eq!(delay, Duration::from_secs(0)); } #[test] fn initial_delay_one_second_interval() { // interval=1s → max_delay_secs=0 → delay=0 let delay = calculate_initial_delay("site", Duration::from_secs(1)); assert_eq!(delay, Duration::from_secs(0)); } #[test] fn initial_delay_within_half_interval() { let interval = Duration::from_secs(600); // 10 min let delay = calculate_initial_delay("my-site", interval); // Must be < interval/2 (300s) assert!(delay < Duration::from_secs(300)); } #[test] fn initial_delay_deterministic() { let interval = Duration::from_secs(600); let d1 = calculate_initial_delay("my-site", interval); let d2 = calculate_initial_delay("my-site", interval); assert_eq!(d1, d2); } #[test] fn initial_delay_different_sites_differ() { let interval = Duration::from_secs(3600); let d1 = calculate_initial_delay("site-alpha", interval); let d2 = calculate_initial_delay("site-beta", interval); // Different names should (almost certainly) produce different delays assert_ne!(d1, d2); } }