diff options
| author | Dawid Rycerz <dawid@rycerz.xyz> | 2026-02-15 21:27:00 +0100 |
|---|---|---|
| committer | Dawid Rycerz <dawid@rycerz.xyz> | 2026-02-15 21:27:00 +0100 |
| commit | ce0dbf6b249956700c6a1705bf4ad85a09d53e8c (patch) | |
| tree | d7c3236807cfbf75d7f3a355eb5df5a5e2cc4ad7 /src/server.rs | |
| parent | 064a1d01c5c14f5ecc032fa9b8346a4a88b893f6 (diff) | |
Switch, cleanup, and status CLI commands. Persistent build state via
state.json. Post-deploy hooks on success and failure with
WITRYNA_BUILD_STATUS. Dependency diet (axum→tiny_http, clap→argh,
tracing→log). Drop built-in rate limiting. Nix flake with NixOS module.
Arch Linux PKGBUILD. Centralized version management.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat (limited to 'src/server.rs')
| -rw-r--r-- | src/server.rs | 1026 |
1 files changed, 381 insertions, 645 deletions
diff --git a/src/server.rs b/src/server.rs index e31a1e4..a2aef5c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -2,24 +2,11 @@ use crate::build_guard::{BuildGuard, BuildScheduler}; use crate::config::{Config, SiteConfig}; use crate::polling::PollingManager; use anyhow::Result; -use axum::{ - Json, Router, - extract::{DefaultBodyLimit, Path, State}, - http::{HeaderMap, StatusCode}, - response::IntoResponse, - routing::{get, post}, -}; -use governor::clock::DefaultClock; -use governor::state::keyed::DashMapStateStore; -use governor::{Quota, RateLimiter}; -use std::num::NonZeroU32; +use log::{error, info, warn}; use std::path::PathBuf; -use std::sync::Arc; -use subtle::ConstantTimeEq as _; -use tokio::net::TcpListener; +use std::sync::{Arc, RwLock}; +use tiny_http::{Header, Method, Request, Response, Server}; use tokio::signal::unix::{SignalKind, signal}; -use tokio::sync::RwLock; -use tracing::{error, info, warn}; #[derive(serde::Serialize)] struct ErrorResponse { @@ -36,114 +23,118 @@ struct HealthResponse { status: &'static str, } -fn error_response(status: StatusCode, error: &'static str) -> impl IntoResponse { - (status, Json(ErrorResponse { error })) +fn json_response(status: u16, body: &str) -> Response<std::io::Cursor<Vec<u8>>> { + let data = body.as_bytes().to_vec(); + Response::from_data(data) + .with_status_code(status) + .with_header(Header::from_bytes("Content-Type", "application/json").expect("valid header")) } -type TokenRateLimiter = RateLimiter<String, DashMapStateStore<String>, DefaultClock>; +fn empty_response(status: u16) -> Response<std::io::Empty> { + Response::empty(status) +} #[derive(Clone)] pub struct AppState { pub config: Arc<RwLock<Config>>, pub config_path: Arc<PathBuf>, pub build_scheduler: Arc<BuildScheduler>, - pub rate_limiter: Arc<TokenRateLimiter>, pub polling_manager: Arc<PollingManager>, } -pub fn create_router(state: AppState) -> Router { - Router::new() - .route("/health", get(health_handler)) - .route("/{site_name}", post(deploy_handler)) - .layer(DefaultBodyLimit::max(1024 * 1024)) // 1MB limit - .with_state(state) -} - -async fn health_handler() -> impl IntoResponse { - Json(HealthResponse { status: "ok" }) -} - -/// Extract Bearer token from Authorization header. -fn extract_bearer_token(headers: &HeaderMap) -> Option<&str> { +/// Extract Bearer token from `tiny_http` headers. +fn extract_bearer_token(headers: &[Header]) -> Option<&str> { headers - .get("authorization") - .and_then(|v| v.to_str().ok()) - .and_then(|v| v.strip_prefix("Bearer ")) + .iter() + .find(|h| h.field.equiv("Authorization")) + .and_then(|h| h.value.as_str().strip_prefix("Bearer ")) } fn validate_token(provided: &str, expected: &str) -> bool { - let provided_bytes = provided.as_bytes(); - let expected_bytes = expected.as_bytes(); + let a = provided.as_bytes(); + let b = expected.as_bytes(); + + // Constant-time comparison — OWASP requirement. + // Length check is not constant-time, but token length is not secret + // (same early-return approach as subtle::ConstantTimeEq for slices). + if a.len() != b.len() { + return false; + } - // Constant-time comparison - OWASP requirement - provided_bytes.ct_eq(expected_bytes).into() + let mut acc: u8 = 0; + for (x, y) in a.iter().zip(b.iter()) { + acc |= x ^ y; + } + acc == 0 } -async fn deploy_handler( - State(state): State<AppState>, - Path(site_name): Path<String>, - headers: HeaderMap, -) -> impl IntoResponse { - info!(%site_name, "deployment request received"); - - // Find the site first to avoid information leakage - let site = { - let config = state.config.read().await; - if let Some(site) = config.find_site(&site_name) { - site.clone() - } else { - info!(%site_name, "site not found"); - return error_response(StatusCode::NOT_FOUND, "not_found").into_response(); - } - }; - - // Validate Bearer token (skip if auth disabled for this site) - if site.webhook_token.is_empty() { - // Auth disabled — rate limit by site name instead - if state.rate_limiter.check_key(&site_name).is_err() { - info!(%site_name, "rate limit exceeded"); - return error_response(StatusCode::TOO_MANY_REQUESTS, "rate_limit_exceeded") - .into_response(); - } - } else { - let Some(token) = extract_bearer_token(&headers) else { - info!(%site_name, "missing or malformed authorization header"); - return error_response(StatusCode::UNAUTHORIZED, "unauthorized").into_response(); - }; +/// Check if path is a single segment (e.g., "/my-site"). +fn is_site_path(path: &str) -> bool { + path.starts_with('/') && path.len() > 1 && !path[1..].contains('/') +} - if !validate_token(token, &site.webhook_token) { - info!(%site_name, "invalid token"); - return error_response(StatusCode::UNAUTHORIZED, "unauthorized").into_response(); - } +/// Handle POST `/{site_name}`. +fn handle_deploy( + request: Request, + site_name: &str, + state: &AppState, + handle: &tokio::runtime::Handle, +) { + info!("[{site_name}] deployment request received"); + + // Find site + let site = state + .config + .read() + .expect("config lock poisoned") + .find_site(site_name) + .cloned(); + let Some(site) = site else { + info!("[{site_name}] site not found"); + let body = serde_json::to_string(&ErrorResponse { error: "not_found" }) + .expect("static JSON serialization"); + let _ = request.respond(json_response(404, &body)); + return; + }; - // Rate limit check (per token) - if state.rate_limiter.check_key(&token.to_owned()).is_err() { - info!(%site_name, "rate limit exceeded"); - return error_response(StatusCode::TOO_MANY_REQUESTS, "rate_limit_exceeded") - .into_response(); + // Auth check (if configured) + if !site.webhook_token.is_empty() { + let token_valid = extract_bearer_token(request.headers()) + .is_some_and(|token| validate_token(token, &site.webhook_token)); + + if !token_valid { + info!("[{site_name}] unauthorized request"); + let body = serde_json::to_string(&ErrorResponse { + error: "unauthorized", + }) + .expect("static JSON serialization"); + let _ = request.respond(json_response(401, &body)); + return; } } // Try immediate build - let Some(guard) = BuildGuard::try_acquire(site_name.clone(), &state.build_scheduler) else { + let Some(guard) = BuildGuard::try_acquire(site_name.to_owned(), &state.build_scheduler) else { // Build in progress — try to queue - if state.build_scheduler.try_queue(&site_name) { - info!(%site_name, "build queued"); - return ( - StatusCode::ACCEPTED, - Json(QueuedResponse { status: "queued" }), - ) - .into_response(); + if state.build_scheduler.try_queue(site_name) { + info!("[{site_name}] build queued"); + let body = serde_json::to_string(&QueuedResponse { status: "queued" }) + .expect("static JSON serialization"); + let _ = request.respond(json_response(202, &body)); + return; } // Already queued — collapse - info!(%site_name, "build already queued, collapsing"); - return StatusCode::ACCEPTED.into_response(); + info!("[{site_name}] build already queued, collapsing"); + let _ = request.respond(empty_response(202)); + return; }; - info!(%site_name, "deployment accepted"); + info!("[{site_name}] deployment accepted"); // Spawn async build pipeline with queue drain loop - tokio::spawn(async move { + let state = state.clone(); + let site_name = site_name.to_owned(); + handle.spawn(async move { let mut current_site = site; let mut current_guard = guard; loop { @@ -160,9 +151,15 @@ async fn deploy_handler( if !state.build_scheduler.take_queued(&site_name) { break; } - info!(%site_name, "processing queued rebuild"); - let Some(new_site) = state.config.read().await.find_site(&site_name).cloned() else { - warn!(%site_name, "site removed from config, skipping queued rebuild"); + info!("[{site_name}] processing queued rebuild"); + let Some(new_site) = state + .config + .read() + .expect("config lock poisoned") + .find_site(&site_name) + .cloned() + else { + warn!("[{site_name}] site removed from config, skipping queued rebuild"); break; }; let Some(new_guard) = @@ -175,7 +172,43 @@ async fn deploy_handler( } }); - StatusCode::ACCEPTED.into_response() + let _ = request.respond(empty_response(202)); +} + +/// Main request loop (runs on `std::thread`). +#[allow(clippy::needless_pass_by_value)] // ownership required by std::thread::spawn callers +pub(crate) fn handle_requests( + server: Arc<Server>, + state: AppState, + handle: tokio::runtime::Handle, +) { + for request in server.incoming_requests() { + let path = request.url().split('?').next().unwrap_or("").to_owned(); + let method = request.method().clone(); + + match (method, path.as_str()) { + (Method::Get, "/health") => { + let body = serde_json::to_string(&HealthResponse { status: "ok" }) + .expect("static JSON serialization"); + let _ = request.respond(json_response(200, &body)); + } + (_, "/health") => { + let _ = request.respond(empty_response(405)); + } + (Method::Post, _) if is_site_path(&path) => { + let site_name = &path[1..]; + handle_deploy(request, site_name, &state, &handle); + } + (_, _) if is_site_path(&path) => { + let _ = request.respond(empty_response(405)); + } + _ => { + let body = serde_json::to_string(&ErrorResponse { error: "not_found" }) + .expect("static JSON serialization"); + let _ = request.respond(json_response(404, &body)); + } + } + } } /// Run the complete build pipeline: git sync → build → publish. @@ -187,7 +220,7 @@ pub(crate) async fn run_build_pipeline( _guard: BuildGuard, ) { let (base_dir, log_dir, container_runtime, max_builds_to_keep, git_timeout) = { - let config = state.config.read().await; + let config = state.config.read().expect("config lock poisoned"); ( config.base_dir.clone(), config.log_dir.clone(), @@ -213,14 +246,13 @@ pub(crate) async fn run_build_pipeline( { Ok(result) => { info!( - %site_name, - build_dir = %result.build_dir.display(), - duration_secs = result.duration.as_secs(), - "pipeline completed" + "[{site_name}] pipeline completed: build_dir={} duration_secs={}", + result.build_dir.display(), + result.duration.as_secs() ); } Err(e) => { - error!(%site_name, error = %e, "pipeline failed"); + error!("[{site_name}] pipeline failed: {e}"); } } } @@ -239,38 +271,41 @@ pub(crate) fn setup_sighup_handler(state: AppState) { let config_path = state.config_path.as_ref(); match Config::load(config_path).await { Ok(new_config) => { - let old_sites_count = state.config.read().await.sites.len(); + let old_sites_count = state + .config + .read() + .expect("config lock poisoned") + .sites + .len(); let new_sites_count = new_config.sites.len(); // Check for non-reloadable changes and capture old values let (old_listen, old_base, old_log_dir, old_log_level) = { - let old_config = state.config.read().await; + let old_config = state.config.read().expect("config lock poisoned"); if old_config.listen_address != new_config.listen_address { warn!( - old = %old_config.listen_address, - new = %new_config.listen_address, - "listen_address changed but cannot be reloaded (restart required)" + "listen_address changed but cannot be reloaded (restart required): old={} new={}", + old_config.listen_address, new_config.listen_address ); } if old_config.base_dir != new_config.base_dir { warn!( - old = %old_config.base_dir.display(), - new = %new_config.base_dir.display(), - "base_dir changed but cannot be reloaded (restart required)" + "base_dir changed but cannot be reloaded (restart required): old={} new={}", + old_config.base_dir.display(), + new_config.base_dir.display() ); } if old_config.log_dir != new_config.log_dir { warn!( - old = %old_config.log_dir.display(), - new = %new_config.log_dir.display(), - "log_dir changed but cannot be reloaded (restart required)" + "log_dir changed but cannot be reloaded (restart required): old={} new={}", + old_config.log_dir.display(), + new_config.log_dir.display() ); } if old_config.log_level != new_config.log_level { warn!( - old = %old_config.log_level, - new = %new_config.log_level, - "log_level changed but cannot be reloaded (restart required)" + "log_level changed but cannot be reloaded (restart required): old={} new={}", + old_config.log_level, new_config.log_level ); } ( @@ -289,7 +324,7 @@ pub(crate) fn setup_sighup_handler(state: AppState) { final_config.log_level = old_log_level; // Apply the merged configuration - *state.config.write().await = final_config; + *state.config.write().expect("config lock poisoned") = final_config; // Restart polling tasks with new configuration info!("restarting polling tasks"); @@ -297,12 +332,11 @@ pub(crate) fn setup_sighup_handler(state: AppState) { state.polling_manager.start_polling(state.clone()).await; info!( - old_sites_count, - new_sites_count, "configuration reloaded successfully" + "configuration reloaded successfully: old_sites_count={old_sites_count} new_sites_count={new_sites_count}" ); } Err(e) => { - error!(error = %e, "failed to reload configuration, keeping current config"); + error!("failed to reload configuration, keeping current config: {e}"); } } } @@ -315,28 +349,14 @@ pub(crate) fn setup_sighup_handler(state: AppState) { /// /// Returns an error if the TCP listener cannot bind or the server encounters /// a fatal I/O error. -/// -/// # Panics -/// -/// Panics if `rate_limit_per_minute` is zero. This is unreachable after -/// successful config validation. pub async fn run(config: Config, config_path: PathBuf) -> Result<()> { let addr = config.parsed_listen_address(); - #[allow(clippy::expect_used)] // validated by Config::validate_rate_limit() - let quota = Quota::per_minute( - NonZeroU32::new(config.rate_limit_per_minute) - .expect("rate_limit_per_minute must be greater than 0"), - ); - let rate_limiter = Arc::new(RateLimiter::dashmap(quota)); - let polling_manager = Arc::new(PollingManager::new()); - let state = AppState { config: Arc::new(RwLock::new(config)), config_path: Arc::new(config_path), build_scheduler: Arc::new(BuildScheduler::new()), - rate_limiter, - polling_manager, + polling_manager: Arc::new(PollingManager::new()), }; // Setup SIGHUP handler for configuration hot-reload @@ -345,37 +365,54 @@ pub async fn run(config: Config, config_path: PathBuf) -> Result<()> { // Start polling tasks for sites with poll_interval configured state.polling_manager.start_polling(state.clone()).await; - let listener = TcpListener::bind(addr).await?; - info!(%addr, "server listening"); + let server = Arc::new(Server::http(addr).map_err(|e| anyhow::anyhow!("failed to bind: {e}"))?); + info!("server listening on {addr}"); - run_with_listener(state, listener, async { + // Shutdown handler: signal → unblock server + let shutdown_server = Arc::clone(&server); + tokio::spawn(async move { let mut sigterm = signal(SignalKind::terminate()).expect("failed to setup SIGTERM handler"); let mut sigint = signal(SignalKind::interrupt()).expect("failed to setup SIGINT handler"); tokio::select! { _ = sigterm.recv() => info!("received SIGTERM, shutting down"), _ = sigint.recv() => info!("received SIGINT, shutting down"), } + shutdown_server.unblock(); + }); + + // Run HTTP loop on blocking thread + let handle = tokio::runtime::Handle::current(); + tokio::task::spawn_blocking(move || { + handle_requests(server, state, handle); }) - .await + .await?; + + Ok(()) } -/// Run the server on an already-bound listener with a custom shutdown signal. +/// Run the server with a pre-built Server, shutting down when `shutdown_signal` resolves. /// -/// This is the core server loop used by both production (`run`) and integration tests. -/// Production delegates here after binding the listener and setting up SIGHUP handlers. -/// Tests call this via `test_support::run_server` with their own listener and shutdown channel. -pub(crate) async fn run_with_listener( +/// Used by integration tests via [`test_support::run_server`]. +/// Returns a `std::thread::JoinHandle` for the request-handling thread. +#[cfg(any(test, feature = "integration"))] +pub(crate) fn run_with_server( state: AppState, - listener: TcpListener, + server: Arc<Server>, shutdown_signal: impl std::future::Future<Output = ()> + Send + 'static, -) -> Result<()> { - let router = create_router(state); +) -> std::thread::JoinHandle<()> { + let handle = tokio::runtime::Handle::current(); - axum::serve(listener, router) - .with_graceful_shutdown(shutdown_signal) - .await?; + // Shutdown: wait for signal, then unblock + let shutdown_server = Arc::clone(&server); + tokio::spawn(async move { + shutdown_signal.await; + shutdown_server.unblock(); + }); - Ok(()) + // Spawn request handler on std::thread, return handle for joining + std::thread::spawn(move || { + handle_requests(server, state, handle); + }) } #[cfg(test)] @@ -383,23 +420,13 @@ pub(crate) async fn run_with_listener( mod tests { use super::*; use crate::config::{BuildOverrides, SiteConfig}; - use axum::body::Body; - use axum::http::{Request, StatusCode}; - use axum::response::Response; use std::path::PathBuf; - use tower::ServiceExt as _; fn test_state(config: Config) -> AppState { - test_state_with_rate_limit(config, 1000) // High limit for most tests - } - - fn test_state_with_rate_limit(config: Config, rate_limit: u32) -> AppState { - let quota = Quota::per_minute(NonZeroU32::new(rate_limit).unwrap()); AppState { config: Arc::new(RwLock::new(config)), config_path: Arc::new(PathBuf::from("witryna.toml")), build_scheduler: Arc::new(BuildScheduler::new()), - rate_limiter: Arc::new(RateLimiter::dashmap(quota)), polling_manager: Arc::new(PollingManager::new()), } } @@ -411,7 +438,6 @@ mod tests { base_dir: PathBuf::from("/var/lib/witryna"), log_dir: PathBuf::from("/var/log/witryna"), log_level: "info".to_owned(), - rate_limit_per_minute: 10, max_builds_to_keep: 5, git_timeout: None, sites: vec![], @@ -445,221 +471,169 @@ mod tests { } } + /// Start a test server on a random port, returning the server handle, state, and port. + fn test_server(config: Config) -> (Arc<Server>, AppState, u16) { + let state = test_state(config); + let server = Arc::new(Server::http("127.0.0.1:0").unwrap()); + let port = match server.server_addr() { + tiny_http::ListenAddr::IP(a) => a.port(), + _ => unreachable!("expected IP address"), + }; + let handle = tokio::runtime::Handle::current(); + let server_clone = server.clone(); + let state_clone = state.clone(); + std::thread::spawn(move || handle_requests(server_clone, state_clone, handle)); + (server, state, port) + } + #[tokio::test] async fn health_endpoint_returns_ok() { - let state = test_state(test_config_with_sites()); - let router = create_router(state); - - let response: Response = router - .oneshot( - Request::builder() - .uri("/health") - .body(Body::empty()) - .unwrap(), - ) + let (server, _state, port) = test_server(test_config_with_sites()); + let resp = reqwest::get(format!("http://127.0.0.1:{port}/health")) .await .unwrap(); - - assert_eq!(response.status(), StatusCode::OK); - let body = axum::body::to_bytes(response.into_body(), 1024) - .await - .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(resp.status().as_u16(), 200); + let json: serde_json::Value = resp.json().await.unwrap(); assert_eq!(json["status"], "ok"); + server.unblock(); } #[tokio::test] - async fn unknown_site_post_returns_not_found() { - let state = test_state(test_config()); - let router = create_router(state); - - let response: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/nonexistent") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); + async fn json_responses_have_content_type_header() { + let (server, _state, port) = test_server(test_config_with_sites()); + let resp = reqwest::get(format!("http://127.0.0.1:{port}/health")) + .await + .unwrap(); + assert_eq!( + resp.headers() + .get("content-type") + .unwrap() + .to_str() + .unwrap(), + "application/json" + ); + server.unblock(); + } - assert_eq!(response.status(), StatusCode::NOT_FOUND); - let body = axum::body::to_bytes(response.into_body(), 1024) + #[tokio::test] + async fn unknown_site_post_returns_not_found() { + let (server, _state, port) = test_server(test_config()); + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://127.0.0.1:{port}/nonexistent")) + .send() .await .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(resp.status().as_u16(), 404); + let json: serde_json::Value = resp.json().await.unwrap(); assert_eq!(json["error"], "not_found"); + server.unblock(); } #[tokio::test] async fn deploy_known_site_with_valid_token_returns_accepted() { - let state = test_state(test_config_with_sites()); - let router = create_router(state); - - let response: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .header("Authorization", "Bearer secret-token") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - - assert_eq!(response.status(), StatusCode::ACCEPTED); - let body = axum::body::to_bytes(response.into_body(), 1024) + let (server, _state, port) = test_server(test_config_with_sites()); + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://127.0.0.1:{port}/my-site")) + .header("Authorization", "Bearer secret-token") + .send() .await .unwrap(); - assert!(body.is_empty()); + assert_eq!(resp.status().as_u16(), 202); + server.unblock(); } #[tokio::test] async fn deploy_missing_auth_header_returns_unauthorized() { - let state = test_state(test_config_with_sites()); - let router = create_router(state); - - let response: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - - assert_eq!(response.status(), StatusCode::UNAUTHORIZED); - let body = axum::body::to_bytes(response.into_body(), 1024) + let (server, _state, port) = test_server(test_config_with_sites()); + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://127.0.0.1:{port}/my-site")) + .send() .await .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(resp.status().as_u16(), 401); + let json: serde_json::Value = resp.json().await.unwrap(); assert_eq!(json["error"], "unauthorized"); + server.unblock(); } #[tokio::test] async fn deploy_invalid_token_returns_unauthorized() { - let state = test_state(test_config_with_sites()); - let router = create_router(state); - - let response: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .header("Authorization", "Bearer wrong-token") - .body(Body::empty()) - .unwrap(), - ) + let (server, _state, port) = test_server(test_config_with_sites()); + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://127.0.0.1:{port}/my-site")) + .header("Authorization", "Bearer wrong-token") + .send() .await .unwrap(); - - assert_eq!(response.status(), StatusCode::UNAUTHORIZED); - let body = axum::body::to_bytes(response.into_body(), 1024) - .await - .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(resp.status().as_u16(), 401); + let json: serde_json::Value = resp.json().await.unwrap(); assert_eq!(json["error"], "unauthorized"); + server.unblock(); } #[tokio::test] async fn deploy_malformed_auth_header_returns_unauthorized() { - let state = test_state(test_config_with_sites()); - let router = create_router(state); - + let (server, _state, port) = test_server(test_config_with_sites()); + let client = reqwest::Client::new(); // Test without "Bearer " prefix - let response: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .header("Authorization", "secret-token") - .body(Body::empty()) - .unwrap(), - ) + let resp = client + .post(format!("http://127.0.0.1:{port}/my-site")) + .header("Authorization", "secret-token") + .send() .await .unwrap(); - - assert_eq!(response.status(), StatusCode::UNAUTHORIZED); - let body = axum::body::to_bytes(response.into_body(), 1024) - .await - .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(resp.status().as_u16(), 401); + let json: serde_json::Value = resp.json().await.unwrap(); assert_eq!(json["error"], "unauthorized"); + server.unblock(); } #[tokio::test] async fn deploy_basic_auth_returns_unauthorized() { - let state = test_state(test_config_with_sites()); - let router = create_router(state); - + let (server, _state, port) = test_server(test_config_with_sites()); + let client = reqwest::Client::new(); // Test Basic auth instead of Bearer - let response: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .header("Authorization", "Basic dXNlcjpwYXNz") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - - assert_eq!(response.status(), StatusCode::UNAUTHORIZED); - let body = axum::body::to_bytes(response.into_body(), 1024) + let resp = client + .post(format!("http://127.0.0.1:{port}/my-site")) + .header("Authorization", "Basic dXNlcjpwYXNz") + .send() .await .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(resp.status().as_u16(), 401); + let json: serde_json::Value = resp.json().await.unwrap(); assert_eq!(json["error"], "unauthorized"); + server.unblock(); } #[tokio::test] async fn deploy_get_method_not_allowed() { - let state = test_state(test_config_with_sites()); - let router = create_router(state); - - let response: Response = router - .oneshot( - Request::builder() - .method("GET") - .uri("/my-site") - .body(Body::empty()) - .unwrap(), - ) + let (server, _state, port) = test_server(test_config_with_sites()); + let resp = reqwest::get(format!("http://127.0.0.1:{port}/my-site")) .await .unwrap(); - - assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED); + assert_eq!(resp.status().as_u16(), 405); + server.unblock(); } #[tokio::test] async fn deploy_unknown_site_with_token_returns_not_found() { - let state = test_state(test_config_with_sites()); - let router = create_router(state); - - let response: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/unknown-site") - .header("Authorization", "Bearer any-token") - .body(Body::empty()) - .unwrap(), - ) + let (server, _state, port) = test_server(test_config_with_sites()); + let client = reqwest::Client::new(); + let resp = client + .post(format!("http://127.0.0.1:{port}/unknown-site")) + .header("Authorization", "Bearer any-token") + .send() .await .unwrap(); - // Returns 404 before checking token (site lookup first) - assert_eq!(response.status(), StatusCode::NOT_FOUND); - let body = axum::body::to_bytes(response.into_body(), 1024) - .await - .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(resp.status().as_u16(), 404); + let json: serde_json::Value = resp.json().await.unwrap(); assert_eq!(json["error"], "not_found"); + server.unblock(); } fn test_config_with_two_sites() -> Config { @@ -669,7 +643,6 @@ mod tests { base_dir: PathBuf::from("/var/lib/witryna"), log_dir: PathBuf::from("/var/log/witryna"), log_level: "info".to_owned(), - rate_limit_per_minute: 10, max_builds_to_keep: 5, git_timeout: None, sites: vec![ @@ -721,290 +694,92 @@ mod tests { #[tokio::test] async fn deploy_concurrent_same_site_gets_queued() { - let state = test_state(test_config_with_sites()); - let router = create_router(state.clone()); - - // First request should succeed (immediate build) - let response1: Response = router - .clone() - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .header("Authorization", "Bearer secret-token") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response1.status(), StatusCode::ACCEPTED); - let body1 = axum::body::to_bytes(response1.into_body(), 1024) - .await - .unwrap(); - assert!(body1.is_empty()); - - // Second request to same site should be queued (202 with body) - let response2: Response = router - .clone() - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .header("Authorization", "Bearer secret-token") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response2.status(), StatusCode::ACCEPTED); - let body2 = axum::body::to_bytes(response2.into_body(), 1024) + let (server, state, port) = test_server(test_config_with_sites()); + let client = reqwest::Client::new(); + + // Pre-mark site as building to simulate an in-progress build + state + .build_scheduler + .in_progress + .lock() + .unwrap() + .insert("my-site".to_owned()); + + // First request to same site should be queued (202 with body) + let resp1 = client + .post(format!("http://127.0.0.1:{port}/my-site")) + .header("Authorization", "Bearer secret-token") + .send() .await .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body2).unwrap(); + assert_eq!(resp1.status().as_u16(), 202); + let json: serde_json::Value = resp1.json().await.unwrap(); assert_eq!(json["status"], "queued"); - // Third request should be collapsed (202, no body) - let response3: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .header("Authorization", "Bearer secret-token") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response3.status(), StatusCode::ACCEPTED); - let body3 = axum::body::to_bytes(response3.into_body(), 1024) + // Second request should be collapsed (202, no body) + let resp2 = client + .post(format!("http://127.0.0.1:{port}/my-site")) + .header("Authorization", "Bearer secret-token") + .send() .await .unwrap(); - assert!(body3.is_empty()); + assert_eq!(resp2.status().as_u16(), 202); + + server.unblock(); } #[tokio::test] async fn deploy_concurrent_different_sites_both_succeed() { - let state = test_state(test_config_with_two_sites()); - let router = create_router(state.clone()); + let (server, _state, port) = test_server(test_config_with_two_sites()); + let client = reqwest::Client::new(); // First site deployment - let response1: Response = router - .clone() - .oneshot( - Request::builder() - .method("POST") - .uri("/site-one") - .header("Authorization", "Bearer token-one") - .body(Body::empty()) - .unwrap(), - ) + let resp1 = client + .post(format!("http://127.0.0.1:{port}/site-one")) + .header("Authorization", "Bearer token-one") + .send() .await .unwrap(); - assert_eq!(response1.status(), StatusCode::ACCEPTED); + assert_eq!(resp1.status().as_u16(), 202); // Second site deployment should also succeed - let response2: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/site-two") - .header("Authorization", "Bearer token-two") - .body(Body::empty()) - .unwrap(), - ) + let resp2 = client + .post(format!("http://127.0.0.1:{port}/site-two")) + .header("Authorization", "Bearer token-two") + .send() .await .unwrap(); - assert_eq!(response2.status(), StatusCode::ACCEPTED); + assert_eq!(resp2.status().as_u16(), 202); + + server.unblock(); } #[tokio::test] async fn deploy_site_in_progress_checked_after_auth() { - let state = test_state(test_config_with_sites()); + let (server, state, port) = test_server(test_config_with_sites()); // Pre-mark site as building state .build_scheduler .in_progress + .lock() + .unwrap() .insert("my-site".to_owned()); - let router = create_router(state); + let client = reqwest::Client::new(); // Request with wrong token should return 401 (auth checked before build status) - let response: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .header("Authorization", "Bearer wrong-token") - .body(Body::empty()) - .unwrap(), - ) + let resp = client + .post(format!("http://127.0.0.1:{port}/my-site")) + .header("Authorization", "Bearer wrong-token") + .send() .await .unwrap(); - assert_eq!(response.status(), StatusCode::UNAUTHORIZED); - let body = axum::body::to_bytes(response.into_body(), 1024) - .await - .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); + assert_eq!(resp.status().as_u16(), 401); + let json: serde_json::Value = resp.json().await.unwrap(); assert_eq!(json["error"], "unauthorized"); - } - #[tokio::test] - async fn rate_limit_exceeded_returns_429() { - // Create state with rate limit of 2 per minute - let state = test_state_with_rate_limit(test_config_with_sites(), 2); - let router = create_router(state); - - // First request should succeed - let response1: Response = router - .clone() - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .header("Authorization", "Bearer secret-token") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response1.status(), StatusCode::ACCEPTED); - - // Second request should succeed (or 409 if build in progress) - let response2: Response = router - .clone() - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .header("Authorization", "Bearer secret-token") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - // Could be 202 or 409 depending on timing - assert!( - response2.status() == StatusCode::ACCEPTED - || response2.status() == StatusCode::CONFLICT - ); - - // Third request should hit rate limit - let response3: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .header("Authorization", "Bearer secret-token") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response3.status(), StatusCode::TOO_MANY_REQUESTS); - let body = axum::body::to_bytes(response3.into_body(), 1024) - .await - .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); - assert_eq!(json["error"], "rate_limit_exceeded"); - } - - #[tokio::test] - async fn rate_limit_different_tokens_independent() { - // Create state with rate limit of 1 per minute - let state = test_state_with_rate_limit(test_config_with_two_sites(), 1); - let router = create_router(state); - - // First request with token-one should succeed - let response1: Response = router - .clone() - .oneshot( - Request::builder() - .method("POST") - .uri("/site-one") - .header("Authorization", "Bearer token-one") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response1.status(), StatusCode::ACCEPTED); - - // Second request with token-one should hit rate limit - let response2: Response = router - .clone() - .oneshot( - Request::builder() - .method("POST") - .uri("/site-one") - .header("Authorization", "Bearer token-one") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response2.status(), StatusCode::TOO_MANY_REQUESTS); - let body = axum::body::to_bytes(response2.into_body(), 1024) - .await - .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); - assert_eq!(json["error"], "rate_limit_exceeded"); - - // Request with different token should still succeed - let response3: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/site-two") - .header("Authorization", "Bearer token-two") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response3.status(), StatusCode::ACCEPTED); - } - - #[tokio::test] - async fn rate_limit_checked_after_auth() { - // Create state with rate limit of 1 per minute - let state = test_state_with_rate_limit(test_config_with_sites(), 1); - let router = create_router(state); - - // First valid request exhausts rate limit - let response1: Response = router - .clone() - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .header("Authorization", "Bearer secret-token") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response1.status(), StatusCode::ACCEPTED); - - // Request with invalid token should return 401, not 429 - // (auth is checked before rate limit) - let response2: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/my-site") - .header("Authorization", "Bearer wrong-token") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response2.status(), StatusCode::UNAUTHORIZED); - let body = axum::body::to_bytes(response2.into_body(), 1024) - .await - .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); - assert_eq!(json["error"], "unauthorized"); + server.unblock(); } #[tokio::test] @@ -1016,7 +791,6 @@ mod tests { base_dir: PathBuf::from("/var/lib/witryna"), log_dir: PathBuf::from("/var/log/witryna"), log_level: "info".to_owned(), - rate_limit_per_minute: 10, max_builds_to_keep: 5, git_timeout: None, sites: vec![SiteConfig { @@ -1052,7 +826,6 @@ mod tests { base_dir: PathBuf::from("/tmp/new-base"), log_dir: PathBuf::from("/tmp/new-logs"), log_level: "debug".to_owned(), - rate_limit_per_minute: 20, max_builds_to_keep: 10, git_timeout: None, sites: vec![SiteConfig { @@ -1080,7 +853,7 @@ mod tests { // Apply the same merge logic used in setup_sighup_handler let (old_listen, old_base, old_log_dir, old_log_level) = { - let old_config = state.config.read().await; + let old_config = state.config.read().unwrap(); ( old_config.listen_address.clone(), old_config.base_dir.clone(), @@ -1095,21 +868,30 @@ mod tests { final_config.log_dir = old_log_dir; final_config.log_level = old_log_level; - *state.config.write().await = final_config; - - // Verify non-reloadable fields are preserved - let config = state.config.read().await; - assert_eq!(config.listen_address, "127.0.0.1:8080"); - assert_eq!(config.base_dir, PathBuf::from("/var/lib/witryna")); - assert_eq!(config.log_dir, PathBuf::from("/var/log/witryna")); - assert_eq!(config.log_level, "info"); - - // Verify reloadable fields are updated - assert_eq!(config.container_runtime, "docker"); - assert_eq!(config.rate_limit_per_minute, 20); - assert_eq!(config.max_builds_to_keep, 10); - assert_eq!(config.sites.len(), 1); - assert_eq!(config.sites[0].name, "new-site"); + *state.config.write().unwrap() = final_config; + + // Verify non-reloadable fields are preserved and reloadable fields are updated + let (listen, base, log_d, log_l, runtime, max_builds, sites_len, site_name) = { + let config = state.config.read().unwrap(); + ( + config.listen_address.clone(), + config.base_dir.clone(), + config.log_dir.clone(), + config.log_level.clone(), + config.container_runtime.clone(), + config.max_builds_to_keep, + config.sites.len(), + config.sites[0].name.clone(), + ) + }; + assert_eq!(listen, "127.0.0.1:8080"); + assert_eq!(base, PathBuf::from("/var/lib/witryna")); + assert_eq!(log_d, PathBuf::from("/var/log/witryna")); + assert_eq!(log_l, "info"); + assert_eq!(runtime, "docker"); + assert_eq!(max_builds, 10); + assert_eq!(sites_len, 1); + assert_eq!(site_name, "new-site"); } fn test_config_with_disabled_auth() -> Config { @@ -1140,80 +922,34 @@ mod tests { #[tokio::test] async fn deploy_disabled_auth_returns_accepted() { - let state = test_state(test_config_with_disabled_auth()); - let router = create_router(state); + let (server, _state, port) = test_server(test_config_with_disabled_auth()); + let client = reqwest::Client::new(); // Request without Authorization header should succeed - let response: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/open-site") - .body(Body::empty()) - .unwrap(), - ) + let resp = client + .post(format!("http://127.0.0.1:{port}/open-site")) + .send() .await .unwrap(); + assert_eq!(resp.status().as_u16(), 202); - assert_eq!(response.status(), StatusCode::ACCEPTED); + server.unblock(); } #[tokio::test] async fn deploy_disabled_auth_ignores_token() { - let state = test_state(test_config_with_disabled_auth()); - let router = create_router(state); + let (server, _state, port) = test_server(test_config_with_disabled_auth()); + let client = reqwest::Client::new(); // Request WITH a Bearer token should also succeed (token ignored) - let response: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/open-site") - .header("Authorization", "Bearer any-token") - .body(Body::empty()) - .unwrap(), - ) + let resp = client + .post(format!("http://127.0.0.1:{port}/open-site")) + .header("Authorization", "Bearer any-token") + .send() .await .unwrap(); + assert_eq!(resp.status().as_u16(), 202); - assert_eq!(response.status(), StatusCode::ACCEPTED); - } - - #[tokio::test] - async fn deploy_disabled_auth_rate_limited_by_site_name() { - let state = test_state_with_rate_limit(test_config_with_disabled_auth(), 1); - let router = create_router(state); - - // First request should succeed - let response1: Response = router - .clone() - .oneshot( - Request::builder() - .method("POST") - .uri("/open-site") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response1.status(), StatusCode::ACCEPTED); - - // Second request should hit rate limit (keyed by site name) - let response2: Response = router - .oneshot( - Request::builder() - .method("POST") - .uri("/open-site") - .body(Body::empty()) - .unwrap(), - ) - .await - .unwrap(); - assert_eq!(response2.status(), StatusCode::TOO_MANY_REQUESTS); - let body = axum::body::to_bytes(response2.into_body(), 1024) - .await - .unwrap(); - let json: serde_json::Value = serde_json::from_slice(&body).unwrap(); - assert_eq!(json["error"], "rate_limit_exceeded"); + server.unblock(); } } |
