summaryrefslogtreecommitdiff
path: root/src/polling.rs
diff options
context:
space:
mode:
authorDawid Rycerz <dawid@rycerz.xyz>2026-02-15 21:27:00 +0100
committerDawid Rycerz <dawid@rycerz.xyz>2026-02-15 21:27:00 +0100
commitce0dbf6b249956700c6a1705bf4ad85a09d53e8c (patch)
treed7c3236807cfbf75d7f3a355eb5df5a5e2cc4ad7 /src/polling.rs
parent064a1d01c5c14f5ecc032fa9b8346a4a88b893f6 (diff)
feat: witryna 0.2.0HEADv0.2.0main
Switch, cleanup, and status CLI commands. Persistent build state via state.json. Post-deploy hooks on success and failure with WITRYNA_BUILD_STATUS. Dependency diet (axum→tiny_http, clap→argh, tracing→log). Drop built-in rate limiting. Nix flake with NixOS module. Arch Linux PKGBUILD. Centralized version management. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat (limited to 'src/polling.rs')
-rw-r--r--src/polling.rs72
1 files changed, 41 insertions, 31 deletions
diff --git a/src/polling.rs b/src/polling.rs
index 6c25326..c06cfad 100644
--- a/src/polling.rs
+++ b/src/polling.rs
@@ -7,18 +7,17 @@ use crate::build_guard::BuildGuard;
use crate::config::SiteConfig;
use crate::git;
use crate::server::AppState;
+use log::{debug, error, info};
use std::collections::HashMap;
use std::hash::{Hash as _, Hasher as _};
use std::sync::Arc;
use std::time::Duration;
-use tokio::sync::RwLock;
-use tokio_util::sync::CancellationToken;
-use tracing::{debug, error, info};
+use tokio::sync::{RwLock, watch};
/// Manages polling tasks for all sites.
pub struct PollingManager {
- /// Map of `site_name` -> cancellation token for active polling tasks
- tasks: Arc<RwLock<HashMap<String, CancellationToken>>>,
+ /// Map of `site_name` -> cancellation sender for active polling tasks
+ tasks: Arc<RwLock<HashMap<String, watch::Sender<()>>>>,
}
impl PollingManager {
@@ -31,10 +30,19 @@ impl PollingManager {
/// Start polling tasks for sites with `poll_interval` configured.
/// Call this on startup and after SIGHUP reload.
+ ///
+ /// # Panics
+ ///
+ /// Panics if the config `RwLock` is poisoned.
pub async fn start_polling(&self, state: AppState) {
- let config = state.config.read().await;
-
- for site in &config.sites {
+ let sites: Vec<_> = state
+ .config
+ .read()
+ .expect("config lock poisoned")
+ .sites
+ .clone();
+
+ for site in &sites {
if let Some(interval) = site.poll_interval {
self.spawn_poll_task(state.clone(), site.clone(), interval)
.await;
@@ -47,38 +55,37 @@ impl PollingManager {
pub async fn stop_all(&self) {
let mut tasks = self.tasks.write().await;
- for (site_name, token) in tasks.drain() {
- info!(site = %site_name, "stopping polling task");
- token.cancel();
+ for (site_name, tx) in tasks.drain() {
+ info!("[{site_name}] stopping polling task");
+ let _ = tx.send(());
}
}
/// Spawn a single polling task for a site.
async fn spawn_poll_task(&self, state: AppState, site: SiteConfig, interval: Duration) {
let site_name = site.name.clone();
- let token = CancellationToken::new();
+ let (cancel_tx, cancel_rx) = watch::channel(());
- // Store the cancellation token
+ // Store the cancellation sender
{
let mut tasks = self.tasks.write().await;
- tasks.insert(site_name.clone(), token.clone());
+ tasks.insert(site_name.clone(), cancel_tx);
}
info!(
- site = %site_name,
- interval_secs = interval.as_secs(),
- "starting polling task"
+ "[{site_name}] starting polling task: interval_secs={}",
+ interval.as_secs()
);
// Spawn the polling loop
let tasks = Arc::clone(&self.tasks);
tokio::spawn(async move {
#[allow(clippy::large_futures)]
- poll_loop(state, site, interval, token.clone()).await;
+ poll_loop(state, site, interval, cancel_rx).await;
// Remove from active tasks when done
tasks.write().await.remove(&site_name);
- debug!(site = %site_name, "polling task ended");
+ debug!("[{site_name}] polling task ended");
});
}
}
@@ -94,29 +101,32 @@ async fn poll_loop(
state: AppState,
site: SiteConfig,
interval: Duration,
- cancel_token: CancellationToken,
+ mut cancel_rx: watch::Receiver<()>,
) {
let site_name = &site.name;
// Initial delay before first poll (avoid thundering herd on startup)
let initial_delay = calculate_initial_delay(site_name, interval);
- debug!(site = %site_name, delay_secs = initial_delay.as_secs(), "initial poll delay");
+ debug!(
+ "[{site_name}] initial poll delay: {} secs",
+ initial_delay.as_secs()
+ );
tokio::select! {
() = tokio::time::sleep(initial_delay) => {}
- () = cancel_token.cancelled() => return,
+ _ = cancel_rx.changed() => return,
}
loop {
- debug!(site = %site_name, "polling for changes");
+ debug!("[{site_name}] polling for changes");
// 1. Acquire build lock before any git operation
let Some(guard) = BuildGuard::try_acquire(site_name.clone(), &state.build_scheduler) else {
- debug!(site = %site_name, "build in progress, skipping poll cycle");
+ debug!("[{site_name}] build in progress, skipping poll cycle");
tokio::select! {
() = tokio::time::sleep(interval) => {}
- () = cancel_token.cancelled() => {
- info!(site = %site_name, "polling cancelled");
+ _ = cancel_rx.changed() => {
+ info!("[{site_name}] polling cancelled");
return;
}
}
@@ -125,7 +135,7 @@ async fn poll_loop(
// Get current config (might have changed via SIGHUP)
let (base_dir, git_timeout) = {
- let config = state.config.read().await;
+ let config = state.config.read().expect("config lock poisoned");
(
config.base_dir.clone(),
config.git_timeout.unwrap_or(git::GIT_TIMEOUT_DEFAULT),
@@ -144,14 +154,14 @@ async fn poll_loop(
{
Ok(changed) => changed,
Err(e) => {
- error!(site = %site_name, error = %e, "failed to check for changes");
+ error!("[{site_name}] failed to check for changes: {e}");
false
}
};
if has_changes {
// 3a. Keep guard alive — move into build pipeline
- info!(site = %site_name, "new commits detected, triggering build");
+ info!("[{site_name}] new commits detected, triggering build");
#[allow(clippy::large_futures)]
crate::server::run_build_pipeline(
state.clone(),
@@ -168,8 +178,8 @@ async fn poll_loop(
// 4. Sleep (lock is NOT held here in either branch)
tokio::select! {
() = tokio::time::sleep(interval) => {}
- () = cancel_token.cancelled() => {
- info!(site = %site_name, "polling cancelled");
+ _ = cancel_rx.changed() => {
+ info!("[{site_name}] polling cancelled");
return;
}
}