diff options
Diffstat (limited to 'src/server.rs')
| -rw-r--r-- | src/server.rs | 1219 |
1 files changed, 1219 insertions, 0 deletions
diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..e31a1e4 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,1219 @@ +use crate::build_guard::{BuildGuard, BuildScheduler}; +use crate::config::{Config, SiteConfig}; +use crate::polling::PollingManager; +use anyhow::Result; +use axum::{ + Json, Router, + extract::{DefaultBodyLimit, Path, State}, + http::{HeaderMap, StatusCode}, + response::IntoResponse, + routing::{get, post}, +}; +use governor::clock::DefaultClock; +use governor::state::keyed::DashMapStateStore; +use governor::{Quota, RateLimiter}; +use std::num::NonZeroU32; +use std::path::PathBuf; +use std::sync::Arc; +use subtle::ConstantTimeEq as _; +use tokio::net::TcpListener; +use tokio::signal::unix::{SignalKind, signal}; +use tokio::sync::RwLock; +use tracing::{error, info, warn}; + +#[derive(serde::Serialize)] +struct ErrorResponse { + error: &'static str, +} + +#[derive(serde::Serialize)] +struct QueuedResponse { + status: &'static str, +} + +#[derive(serde::Serialize)] +struct HealthResponse { + status: &'static str, +} + +fn error_response(status: StatusCode, error: &'static str) -> impl IntoResponse { + (status, Json(ErrorResponse { error })) +} + +type TokenRateLimiter = RateLimiter<String, DashMapStateStore<String>, DefaultClock>; + +#[derive(Clone)] +pub struct AppState { + pub config: Arc<RwLock<Config>>, + pub config_path: Arc<PathBuf>, + pub build_scheduler: Arc<BuildScheduler>, + pub rate_limiter: Arc<TokenRateLimiter>, + pub polling_manager: Arc<PollingManager>, +} + +pub fn create_router(state: AppState) -> Router { + Router::new() + .route("/health", get(health_handler)) + .route("/{site_name}", post(deploy_handler)) + .layer(DefaultBodyLimit::max(1024 * 1024)) // 1MB limit + .with_state(state) +} + +async fn health_handler() -> impl IntoResponse { + Json(HealthResponse { status: "ok" }) +} + +/// Extract Bearer token from Authorization header. +fn extract_bearer_token(headers: &HeaderMap) -> Option<&str> { + headers + .get("authorization") + .and_then(|v| v.to_str().ok()) + .and_then(|v| v.strip_prefix("Bearer ")) +} + +fn validate_token(provided: &str, expected: &str) -> bool { + let provided_bytes = provided.as_bytes(); + let expected_bytes = expected.as_bytes(); + + // Constant-time comparison - OWASP requirement + provided_bytes.ct_eq(expected_bytes).into() +} + +async fn deploy_handler( + State(state): State<AppState>, + Path(site_name): Path<String>, + headers: HeaderMap, +) -> impl IntoResponse { + info!(%site_name, "deployment request received"); + + // Find the site first to avoid information leakage + let site = { + let config = state.config.read().await; + if let Some(site) = config.find_site(&site_name) { + site.clone() + } else { + info!(%site_name, "site not found"); + return error_response(StatusCode::NOT_FOUND, "not_found").into_response(); + } + }; + + // Validate Bearer token (skip if auth disabled for this site) + if site.webhook_token.is_empty() { + // Auth disabled — rate limit by site name instead + if state.rate_limiter.check_key(&site_name).is_err() { + info!(%site_name, "rate limit exceeded"); + return error_response(StatusCode::TOO_MANY_REQUESTS, "rate_limit_exceeded") + .into_response(); + } + } else { + let Some(token) = extract_bearer_token(&headers) else { + info!(%site_name, "missing or malformed authorization header"); + return error_response(StatusCode::UNAUTHORIZED, "unauthorized").into_response(); + }; + + if !validate_token(token, &site.webhook_token) { + info!(%site_name, "invalid token"); + return error_response(StatusCode::UNAUTHORIZED, "unauthorized").into_response(); + } + + // Rate limit check (per token) + if state.rate_limiter.check_key(&token.to_owned()).is_err() { + info!(%site_name, "rate limit exceeded"); + return error_response(StatusCode::TOO_MANY_REQUESTS, "rate_limit_exceeded") + .into_response(); + } + } + + // Try immediate build + let Some(guard) = BuildGuard::try_acquire(site_name.clone(), &state.build_scheduler) else { + // Build in progress — try to queue + if state.build_scheduler.try_queue(&site_name) { + info!(%site_name, "build queued"); + return ( + StatusCode::ACCEPTED, + Json(QueuedResponse { status: "queued" }), + ) + .into_response(); + } + // Already queued — collapse + info!(%site_name, "build already queued, collapsing"); + return StatusCode::ACCEPTED.into_response(); + }; + + info!(%site_name, "deployment accepted"); + + // Spawn async build pipeline with queue drain loop + tokio::spawn(async move { + let mut current_site = site; + let mut current_guard = guard; + loop { + #[allow(clippy::large_futures)] + run_build_pipeline( + state.clone(), + site_name.clone(), + current_site.clone(), + current_guard, + ) + .await; + // Guard dropped here — build lock released + + if !state.build_scheduler.take_queued(&site_name) { + break; + } + info!(%site_name, "processing queued rebuild"); + let Some(new_site) = state.config.read().await.find_site(&site_name).cloned() else { + warn!(%site_name, "site removed from config, skipping queued rebuild"); + break; + }; + let Some(new_guard) = + BuildGuard::try_acquire(site_name.clone(), &state.build_scheduler) + else { + break; // someone else grabbed it + }; + current_site = new_site; + current_guard = new_guard; + } + }); + + StatusCode::ACCEPTED.into_response() +} + +/// Run the complete build pipeline: git sync → build → publish. +#[allow(clippy::large_futures)] +pub(crate) async fn run_build_pipeline( + state: AppState, + site_name: String, + site: SiteConfig, + _guard: BuildGuard, +) { + let (base_dir, log_dir, container_runtime, max_builds_to_keep, git_timeout) = { + let config = state.config.read().await; + ( + config.base_dir.clone(), + config.log_dir.clone(), + config.container_runtime.clone(), + config.max_builds_to_keep, + config + .git_timeout + .unwrap_or(crate::git::GIT_TIMEOUT_DEFAULT), + ) + }; + + match crate::pipeline::run_build( + &site_name, + &site, + &base_dir, + &log_dir, + &container_runtime, + max_builds_to_keep, + git_timeout, + false, + ) + .await + { + Ok(result) => { + info!( + %site_name, + build_dir = %result.build_dir.display(), + duration_secs = result.duration.as_secs(), + "pipeline completed" + ); + } + Err(e) => { + error!(%site_name, error = %e, "pipeline failed"); + } + } +} + +/// Setup SIGHUP signal handler for configuration hot-reload. +pub(crate) fn setup_sighup_handler(state: AppState) { + tokio::spawn(async move { + #[allow(clippy::expect_used)] // fatal: cannot proceed without signal handler + let mut sighup = + signal(SignalKind::hangup()).expect("failed to setup SIGHUP signal handler"); + + loop { + sighup.recv().await; + info!("SIGHUP received, reloading configuration"); + + let config_path = state.config_path.as_ref(); + match Config::load(config_path).await { + Ok(new_config) => { + let old_sites_count = state.config.read().await.sites.len(); + let new_sites_count = new_config.sites.len(); + + // Check for non-reloadable changes and capture old values + let (old_listen, old_base, old_log_dir, old_log_level) = { + let old_config = state.config.read().await; + if old_config.listen_address != new_config.listen_address { + warn!( + old = %old_config.listen_address, + new = %new_config.listen_address, + "listen_address changed but cannot be reloaded (restart required)" + ); + } + if old_config.base_dir != new_config.base_dir { + warn!( + old = %old_config.base_dir.display(), + new = %new_config.base_dir.display(), + "base_dir changed but cannot be reloaded (restart required)" + ); + } + if old_config.log_dir != new_config.log_dir { + warn!( + old = %old_config.log_dir.display(), + new = %new_config.log_dir.display(), + "log_dir changed but cannot be reloaded (restart required)" + ); + } + if old_config.log_level != new_config.log_level { + warn!( + old = %old_config.log_level, + new = %new_config.log_level, + "log_level changed but cannot be reloaded (restart required)" + ); + } + ( + old_config.listen_address.clone(), + old_config.base_dir.clone(), + old_config.log_dir.clone(), + old_config.log_level.clone(), + ) + }; + + // Preserve non-reloadable fields from the running config + let mut final_config = new_config; + final_config.listen_address = old_listen; + final_config.base_dir = old_base; + final_config.log_dir = old_log_dir; + final_config.log_level = old_log_level; + + // Apply the merged configuration + *state.config.write().await = final_config; + + // Restart polling tasks with new configuration + info!("restarting polling tasks"); + state.polling_manager.stop_all().await; + state.polling_manager.start_polling(state.clone()).await; + + info!( + old_sites_count, + new_sites_count, "configuration reloaded successfully" + ); + } + Err(e) => { + error!(error = %e, "failed to reload configuration, keeping current config"); + } + } + } + }); +} + +/// Start the server in production mode. +/// +/// # Errors +/// +/// Returns an error if the TCP listener cannot bind or the server encounters +/// a fatal I/O error. +/// +/// # Panics +/// +/// Panics if `rate_limit_per_minute` is zero. This is unreachable after +/// successful config validation. +pub async fn run(config: Config, config_path: PathBuf) -> Result<()> { + let addr = config.parsed_listen_address(); + + #[allow(clippy::expect_used)] // validated by Config::validate_rate_limit() + let quota = Quota::per_minute( + NonZeroU32::new(config.rate_limit_per_minute) + .expect("rate_limit_per_minute must be greater than 0"), + ); + let rate_limiter = Arc::new(RateLimiter::dashmap(quota)); + let polling_manager = Arc::new(PollingManager::new()); + + let state = AppState { + config: Arc::new(RwLock::new(config)), + config_path: Arc::new(config_path), + build_scheduler: Arc::new(BuildScheduler::new()), + rate_limiter, + polling_manager, + }; + + // Setup SIGHUP handler for configuration hot-reload + setup_sighup_handler(state.clone()); + + // Start polling tasks for sites with poll_interval configured + state.polling_manager.start_polling(state.clone()).await; + + let listener = TcpListener::bind(addr).await?; + info!(%addr, "server listening"); + + run_with_listener(state, listener, async { + let mut sigterm = signal(SignalKind::terminate()).expect("failed to setup SIGTERM handler"); + let mut sigint = signal(SignalKind::interrupt()).expect("failed to setup SIGINT handler"); + tokio::select! { + _ = sigterm.recv() => info!("received SIGTERM, shutting down"), + _ = sigint.recv() => info!("received SIGINT, shutting down"), + } + }) + .await +} + +/// Run the server on an already-bound listener with a custom shutdown signal. +/// +/// This is the core server loop used by both production (`run`) and integration tests. +/// Production delegates here after binding the listener and setting up SIGHUP handlers. +/// Tests call this via `test_support::run_server` with their own listener and shutdown channel. +pub(crate) async fn run_with_listener( + state: AppState, + listener: TcpListener, + shutdown_signal: impl std::future::Future<Output = ()> + Send + 'static, +) -> Result<()> { + let router = create_router(state); + + axum::serve(listener, router) + .with_graceful_shutdown(shutdown_signal) + .await?; + + Ok(()) +} + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::indexing_slicing, clippy::expect_used)] +mod tests { + use super::*; + use crate::config::{BuildOverrides, SiteConfig}; + use axum::body::Body; + use axum::http::{Request, StatusCode}; + use axum::response::Response; + use std::path::PathBuf; + use tower::ServiceExt as _; + + fn test_state(config: Config) -> AppState { + test_state_with_rate_limit(config, 1000) // High limit for most tests + } + + fn test_state_with_rate_limit(config: Config, rate_limit: u32) -> AppState { + let quota = Quota::per_minute(NonZeroU32::new(rate_limit).unwrap()); + AppState { + config: Arc::new(RwLock::new(config)), + config_path: Arc::new(PathBuf::from("witryna.toml")), + build_scheduler: Arc::new(BuildScheduler::new()), + rate_limiter: Arc::new(RateLimiter::dashmap(quota)), + polling_manager: Arc::new(PollingManager::new()), + } + } + + fn test_config() -> Config { + Config { + listen_address: "127.0.0.1:8080".to_owned(), + container_runtime: "podman".to_owned(), + base_dir: PathBuf::from("/var/lib/witryna"), + log_dir: PathBuf::from("/var/log/witryna"), + log_level: "info".to_owned(), + rate_limit_per_minute: 10, + max_builds_to_keep: 5, + git_timeout: None, + sites: vec![], + } + } + + fn test_config_with_sites() -> Config { + Config { + sites: vec![SiteConfig { + name: "my-site".to_owned(), + repo_url: "https://github.com/user/my-site.git".to_owned(), + branch: "main".to_owned(), + webhook_token: "secret-token".to_owned(), + webhook_token_file: None, + + build_overrides: BuildOverrides::default(), + poll_interval: None, + build_timeout: None, + cache_dirs: None, + post_deploy: None, + env: None, + container_memory: None, + container_cpus: None, + container_pids_limit: None, + container_network: "none".to_owned(), + git_depth: None, + container_workdir: None, + config_file: None, + }], + ..test_config() + } + } + + #[tokio::test] + async fn health_endpoint_returns_ok() { + let state = test_state(test_config_with_sites()); + let router = create_router(state); + + let response: Response = router + .oneshot( + Request::builder() + .uri("/health") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + let body = axum::body::to_bytes(response.into_body(), 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["status"], "ok"); + } + + #[tokio::test] + async fn unknown_site_post_returns_not_found() { + let state = test_state(test_config()); + let router = create_router(state); + + let response: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/nonexistent") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + let body = axum::body::to_bytes(response.into_body(), 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["error"], "not_found"); + } + + #[tokio::test] + async fn deploy_known_site_with_valid_token_returns_accepted() { + let state = test_state(test_config_with_sites()); + let router = create_router(state); + + let response: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .header("Authorization", "Bearer secret-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::ACCEPTED); + let body = axum::body::to_bytes(response.into_body(), 1024) + .await + .unwrap(); + assert!(body.is_empty()); + } + + #[tokio::test] + async fn deploy_missing_auth_header_returns_unauthorized() { + let state = test_state(test_config_with_sites()); + let router = create_router(state); + + let response: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); + let body = axum::body::to_bytes(response.into_body(), 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["error"], "unauthorized"); + } + + #[tokio::test] + async fn deploy_invalid_token_returns_unauthorized() { + let state = test_state(test_config_with_sites()); + let router = create_router(state); + + let response: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .header("Authorization", "Bearer wrong-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); + let body = axum::body::to_bytes(response.into_body(), 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["error"], "unauthorized"); + } + + #[tokio::test] + async fn deploy_malformed_auth_header_returns_unauthorized() { + let state = test_state(test_config_with_sites()); + let router = create_router(state); + + // Test without "Bearer " prefix + let response: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .header("Authorization", "secret-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); + let body = axum::body::to_bytes(response.into_body(), 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["error"], "unauthorized"); + } + + #[tokio::test] + async fn deploy_basic_auth_returns_unauthorized() { + let state = test_state(test_config_with_sites()); + let router = create_router(state); + + // Test Basic auth instead of Bearer + let response: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .header("Authorization", "Basic dXNlcjpwYXNz") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); + let body = axum::body::to_bytes(response.into_body(), 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["error"], "unauthorized"); + } + + #[tokio::test] + async fn deploy_get_method_not_allowed() { + let state = test_state(test_config_with_sites()); + let router = create_router(state); + + let response: Response = router + .oneshot( + Request::builder() + .method("GET") + .uri("/my-site") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED); + } + + #[tokio::test] + async fn deploy_unknown_site_with_token_returns_not_found() { + let state = test_state(test_config_with_sites()); + let router = create_router(state); + + let response: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/unknown-site") + .header("Authorization", "Bearer any-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + // Returns 404 before checking token (site lookup first) + assert_eq!(response.status(), StatusCode::NOT_FOUND); + let body = axum::body::to_bytes(response.into_body(), 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["error"], "not_found"); + } + + fn test_config_with_two_sites() -> Config { + Config { + listen_address: "127.0.0.1:8080".to_owned(), + container_runtime: "podman".to_owned(), + base_dir: PathBuf::from("/var/lib/witryna"), + log_dir: PathBuf::from("/var/log/witryna"), + log_level: "info".to_owned(), + rate_limit_per_minute: 10, + max_builds_to_keep: 5, + git_timeout: None, + sites: vec![ + SiteConfig { + name: "site-one".to_owned(), + repo_url: "https://github.com/user/site-one.git".to_owned(), + branch: "main".to_owned(), + webhook_token: "token-one".to_owned(), + webhook_token_file: None, + + build_overrides: BuildOverrides::default(), + poll_interval: None, + build_timeout: None, + cache_dirs: None, + post_deploy: None, + env: None, + container_memory: None, + container_cpus: None, + container_pids_limit: None, + container_network: "none".to_owned(), + git_depth: None, + container_workdir: None, + config_file: None, + }, + SiteConfig { + name: "site-two".to_owned(), + repo_url: "https://github.com/user/site-two.git".to_owned(), + branch: "main".to_owned(), + webhook_token: "token-two".to_owned(), + webhook_token_file: None, + + build_overrides: BuildOverrides::default(), + poll_interval: None, + build_timeout: None, + cache_dirs: None, + post_deploy: None, + env: None, + container_memory: None, + container_cpus: None, + container_pids_limit: None, + container_network: "none".to_owned(), + git_depth: None, + container_workdir: None, + config_file: None, + }, + ], + } + } + + #[tokio::test] + async fn deploy_concurrent_same_site_gets_queued() { + let state = test_state(test_config_with_sites()); + let router = create_router(state.clone()); + + // First request should succeed (immediate build) + let response1: Response = router + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .header("Authorization", "Bearer secret-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response1.status(), StatusCode::ACCEPTED); + let body1 = axum::body::to_bytes(response1.into_body(), 1024) + .await + .unwrap(); + assert!(body1.is_empty()); + + // Second request to same site should be queued (202 with body) + let response2: Response = router + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .header("Authorization", "Bearer secret-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response2.status(), StatusCode::ACCEPTED); + let body2 = axum::body::to_bytes(response2.into_body(), 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body2).unwrap(); + assert_eq!(json["status"], "queued"); + + // Third request should be collapsed (202, no body) + let response3: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .header("Authorization", "Bearer secret-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response3.status(), StatusCode::ACCEPTED); + let body3 = axum::body::to_bytes(response3.into_body(), 1024) + .await + .unwrap(); + assert!(body3.is_empty()); + } + + #[tokio::test] + async fn deploy_concurrent_different_sites_both_succeed() { + let state = test_state(test_config_with_two_sites()); + let router = create_router(state.clone()); + + // First site deployment + let response1: Response = router + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/site-one") + .header("Authorization", "Bearer token-one") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response1.status(), StatusCode::ACCEPTED); + + // Second site deployment should also succeed + let response2: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/site-two") + .header("Authorization", "Bearer token-two") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response2.status(), StatusCode::ACCEPTED); + } + + #[tokio::test] + async fn deploy_site_in_progress_checked_after_auth() { + let state = test_state(test_config_with_sites()); + + // Pre-mark site as building + state + .build_scheduler + .in_progress + .insert("my-site".to_owned()); + + let router = create_router(state); + + // Request with wrong token should return 401 (auth checked before build status) + let response: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .header("Authorization", "Bearer wrong-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::UNAUTHORIZED); + let body = axum::body::to_bytes(response.into_body(), 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["error"], "unauthorized"); + } + + #[tokio::test] + async fn rate_limit_exceeded_returns_429() { + // Create state with rate limit of 2 per minute + let state = test_state_with_rate_limit(test_config_with_sites(), 2); + let router = create_router(state); + + // First request should succeed + let response1: Response = router + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .header("Authorization", "Bearer secret-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response1.status(), StatusCode::ACCEPTED); + + // Second request should succeed (or 409 if build in progress) + let response2: Response = router + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .header("Authorization", "Bearer secret-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + // Could be 202 or 409 depending on timing + assert!( + response2.status() == StatusCode::ACCEPTED + || response2.status() == StatusCode::CONFLICT + ); + + // Third request should hit rate limit + let response3: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .header("Authorization", "Bearer secret-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response3.status(), StatusCode::TOO_MANY_REQUESTS); + let body = axum::body::to_bytes(response3.into_body(), 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["error"], "rate_limit_exceeded"); + } + + #[tokio::test] + async fn rate_limit_different_tokens_independent() { + // Create state with rate limit of 1 per minute + let state = test_state_with_rate_limit(test_config_with_two_sites(), 1); + let router = create_router(state); + + // First request with token-one should succeed + let response1: Response = router + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/site-one") + .header("Authorization", "Bearer token-one") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response1.status(), StatusCode::ACCEPTED); + + // Second request with token-one should hit rate limit + let response2: Response = router + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/site-one") + .header("Authorization", "Bearer token-one") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response2.status(), StatusCode::TOO_MANY_REQUESTS); + let body = axum::body::to_bytes(response2.into_body(), 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["error"], "rate_limit_exceeded"); + + // Request with different token should still succeed + let response3: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/site-two") + .header("Authorization", "Bearer token-two") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response3.status(), StatusCode::ACCEPTED); + } + + #[tokio::test] + async fn rate_limit_checked_after_auth() { + // Create state with rate limit of 1 per minute + let state = test_state_with_rate_limit(test_config_with_sites(), 1); + let router = create_router(state); + + // First valid request exhausts rate limit + let response1: Response = router + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .header("Authorization", "Bearer secret-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response1.status(), StatusCode::ACCEPTED); + + // Request with invalid token should return 401, not 429 + // (auth is checked before rate limit) + let response2: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/my-site") + .header("Authorization", "Bearer wrong-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response2.status(), StatusCode::UNAUTHORIZED); + let body = axum::body::to_bytes(response2.into_body(), 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["error"], "unauthorized"); + } + + #[tokio::test] + async fn sighup_preserves_non_reloadable_fields() { + // Original config with specific non-reloadable values + let original = Config { + listen_address: "127.0.0.1:8080".to_owned(), + container_runtime: "podman".to_owned(), + base_dir: PathBuf::from("/var/lib/witryna"), + log_dir: PathBuf::from("/var/log/witryna"), + log_level: "info".to_owned(), + rate_limit_per_minute: 10, + max_builds_to_keep: 5, + git_timeout: None, + sites: vec![SiteConfig { + name: "old-site".to_owned(), + repo_url: "https://example.com/old.git".to_owned(), + branch: "main".to_owned(), + webhook_token: "old-token".to_owned(), + webhook_token_file: None, + + build_overrides: BuildOverrides::default(), + poll_interval: None, + build_timeout: None, + cache_dirs: None, + post_deploy: None, + env: None, + container_memory: None, + container_cpus: None, + container_pids_limit: None, + container_network: "none".to_owned(), + git_depth: None, + container_workdir: None, + config_file: None, + }], + }; + + let state = test_state(original); + + // Simulate a new config loaded from disk with changed non-reloadable + // AND reloadable fields + let new_config = Config { + listen_address: "0.0.0.0:9999".to_owned(), + container_runtime: "docker".to_owned(), + base_dir: PathBuf::from("/tmp/new-base"), + log_dir: PathBuf::from("/tmp/new-logs"), + log_level: "debug".to_owned(), + rate_limit_per_minute: 20, + max_builds_to_keep: 10, + git_timeout: None, + sites: vec![SiteConfig { + name: "new-site".to_owned(), + repo_url: "https://example.com/new.git".to_owned(), + branch: "develop".to_owned(), + webhook_token: "new-token".to_owned(), + webhook_token_file: None, + + build_overrides: BuildOverrides::default(), + poll_interval: None, + build_timeout: None, + cache_dirs: None, + post_deploy: None, + env: None, + container_memory: None, + container_cpus: None, + container_pids_limit: None, + container_network: "none".to_owned(), + git_depth: None, + container_workdir: None, + config_file: None, + }], + }; + + // Apply the same merge logic used in setup_sighup_handler + let (old_listen, old_base, old_log_dir, old_log_level) = { + let old_config = state.config.read().await; + ( + old_config.listen_address.clone(), + old_config.base_dir.clone(), + old_config.log_dir.clone(), + old_config.log_level.clone(), + ) + }; + + let mut final_config = new_config; + final_config.listen_address = old_listen; + final_config.base_dir = old_base; + final_config.log_dir = old_log_dir; + final_config.log_level = old_log_level; + + *state.config.write().await = final_config; + + // Verify non-reloadable fields are preserved + let config = state.config.read().await; + assert_eq!(config.listen_address, "127.0.0.1:8080"); + assert_eq!(config.base_dir, PathBuf::from("/var/lib/witryna")); + assert_eq!(config.log_dir, PathBuf::from("/var/log/witryna")); + assert_eq!(config.log_level, "info"); + + // Verify reloadable fields are updated + assert_eq!(config.container_runtime, "docker"); + assert_eq!(config.rate_limit_per_minute, 20); + assert_eq!(config.max_builds_to_keep, 10); + assert_eq!(config.sites.len(), 1); + assert_eq!(config.sites[0].name, "new-site"); + } + + fn test_config_with_disabled_auth() -> Config { + Config { + sites: vec![SiteConfig { + name: "open-site".to_owned(), + repo_url: "https://github.com/user/open-site.git".to_owned(), + branch: "main".to_owned(), + webhook_token: String::new(), + webhook_token_file: None, + build_overrides: BuildOverrides::default(), + poll_interval: None, + build_timeout: None, + cache_dirs: None, + post_deploy: None, + env: None, + container_memory: None, + container_cpus: None, + container_pids_limit: None, + container_network: "none".to_owned(), + git_depth: None, + container_workdir: None, + config_file: None, + }], + ..test_config() + } + } + + #[tokio::test] + async fn deploy_disabled_auth_returns_accepted() { + let state = test_state(test_config_with_disabled_auth()); + let router = create_router(state); + + // Request without Authorization header should succeed + let response: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/open-site") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::ACCEPTED); + } + + #[tokio::test] + async fn deploy_disabled_auth_ignores_token() { + let state = test_state(test_config_with_disabled_auth()); + let router = create_router(state); + + // Request WITH a Bearer token should also succeed (token ignored) + let response: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/open-site") + .header("Authorization", "Bearer any-token") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::ACCEPTED); + } + + #[tokio::test] + async fn deploy_disabled_auth_rate_limited_by_site_name() { + let state = test_state_with_rate_limit(test_config_with_disabled_auth(), 1); + let router = create_router(state); + + // First request should succeed + let response1: Response = router + .clone() + .oneshot( + Request::builder() + .method("POST") + .uri("/open-site") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response1.status(), StatusCode::ACCEPTED); + + // Second request should hit rate limit (keyed by site name) + let response2: Response = router + .oneshot( + Request::builder() + .method("POST") + .uri("/open-site") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + assert_eq!(response2.status(), StatusCode::TOO_MANY_REQUESTS); + let body = axum::body::to_bytes(response2.into_body(), 1024) + .await + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(json["error"], "rate_limit_exceeded"); + } +} |
