summaryrefslogtreecommitdiff
path: root/src/server.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/server.rs')
-rw-r--r--src/server.rs1026
1 files changed, 381 insertions, 645 deletions
diff --git a/src/server.rs b/src/server.rs
index e31a1e4..a2aef5c 100644
--- a/src/server.rs
+++ b/src/server.rs
@@ -2,24 +2,11 @@ use crate::build_guard::{BuildGuard, BuildScheduler};
use crate::config::{Config, SiteConfig};
use crate::polling::PollingManager;
use anyhow::Result;
-use axum::{
- Json, Router,
- extract::{DefaultBodyLimit, Path, State},
- http::{HeaderMap, StatusCode},
- response::IntoResponse,
- routing::{get, post},
-};
-use governor::clock::DefaultClock;
-use governor::state::keyed::DashMapStateStore;
-use governor::{Quota, RateLimiter};
-use std::num::NonZeroU32;
+use log::{error, info, warn};
use std::path::PathBuf;
-use std::sync::Arc;
-use subtle::ConstantTimeEq as _;
-use tokio::net::TcpListener;
+use std::sync::{Arc, RwLock};
+use tiny_http::{Header, Method, Request, Response, Server};
use tokio::signal::unix::{SignalKind, signal};
-use tokio::sync::RwLock;
-use tracing::{error, info, warn};
#[derive(serde::Serialize)]
struct ErrorResponse {
@@ -36,114 +23,118 @@ struct HealthResponse {
status: &'static str,
}
-fn error_response(status: StatusCode, error: &'static str) -> impl IntoResponse {
- (status, Json(ErrorResponse { error }))
+fn json_response(status: u16, body: &str) -> Response<std::io::Cursor<Vec<u8>>> {
+ let data = body.as_bytes().to_vec();
+ Response::from_data(data)
+ .with_status_code(status)
+ .with_header(Header::from_bytes("Content-Type", "application/json").expect("valid header"))
}
-type TokenRateLimiter = RateLimiter<String, DashMapStateStore<String>, DefaultClock>;
+fn empty_response(status: u16) -> Response<std::io::Empty> {
+ Response::empty(status)
+}
#[derive(Clone)]
pub struct AppState {
pub config: Arc<RwLock<Config>>,
pub config_path: Arc<PathBuf>,
pub build_scheduler: Arc<BuildScheduler>,
- pub rate_limiter: Arc<TokenRateLimiter>,
pub polling_manager: Arc<PollingManager>,
}
-pub fn create_router(state: AppState) -> Router {
- Router::new()
- .route("/health", get(health_handler))
- .route("/{site_name}", post(deploy_handler))
- .layer(DefaultBodyLimit::max(1024 * 1024)) // 1MB limit
- .with_state(state)
-}
-
-async fn health_handler() -> impl IntoResponse {
- Json(HealthResponse { status: "ok" })
-}
-
-/// Extract Bearer token from Authorization header.
-fn extract_bearer_token(headers: &HeaderMap) -> Option<&str> {
+/// Extract Bearer token from `tiny_http` headers.
+fn extract_bearer_token(headers: &[Header]) -> Option<&str> {
headers
- .get("authorization")
- .and_then(|v| v.to_str().ok())
- .and_then(|v| v.strip_prefix("Bearer "))
+ .iter()
+ .find(|h| h.field.equiv("Authorization"))
+ .and_then(|h| h.value.as_str().strip_prefix("Bearer "))
}
fn validate_token(provided: &str, expected: &str) -> bool {
- let provided_bytes = provided.as_bytes();
- let expected_bytes = expected.as_bytes();
+ let a = provided.as_bytes();
+ let b = expected.as_bytes();
+
+ // Constant-time comparison — OWASP requirement.
+ // Length check is not constant-time, but token length is not secret
+ // (same early-return approach as subtle::ConstantTimeEq for slices).
+ if a.len() != b.len() {
+ return false;
+ }
- // Constant-time comparison - OWASP requirement
- provided_bytes.ct_eq(expected_bytes).into()
+ let mut acc: u8 = 0;
+ for (x, y) in a.iter().zip(b.iter()) {
+ acc |= x ^ y;
+ }
+ acc == 0
}
-async fn deploy_handler(
- State(state): State<AppState>,
- Path(site_name): Path<String>,
- headers: HeaderMap,
-) -> impl IntoResponse {
- info!(%site_name, "deployment request received");
-
- // Find the site first to avoid information leakage
- let site = {
- let config = state.config.read().await;
- if let Some(site) = config.find_site(&site_name) {
- site.clone()
- } else {
- info!(%site_name, "site not found");
- return error_response(StatusCode::NOT_FOUND, "not_found").into_response();
- }
- };
-
- // Validate Bearer token (skip if auth disabled for this site)
- if site.webhook_token.is_empty() {
- // Auth disabled — rate limit by site name instead
- if state.rate_limiter.check_key(&site_name).is_err() {
- info!(%site_name, "rate limit exceeded");
- return error_response(StatusCode::TOO_MANY_REQUESTS, "rate_limit_exceeded")
- .into_response();
- }
- } else {
- let Some(token) = extract_bearer_token(&headers) else {
- info!(%site_name, "missing or malformed authorization header");
- return error_response(StatusCode::UNAUTHORIZED, "unauthorized").into_response();
- };
+/// Check if path is a single segment (e.g., "/my-site").
+fn is_site_path(path: &str) -> bool {
+ path.starts_with('/') && path.len() > 1 && !path[1..].contains('/')
+}
- if !validate_token(token, &site.webhook_token) {
- info!(%site_name, "invalid token");
- return error_response(StatusCode::UNAUTHORIZED, "unauthorized").into_response();
- }
+/// Handle POST `/{site_name}`.
+fn handle_deploy(
+ request: Request,
+ site_name: &str,
+ state: &AppState,
+ handle: &tokio::runtime::Handle,
+) {
+ info!("[{site_name}] deployment request received");
+
+ // Find site
+ let site = state
+ .config
+ .read()
+ .expect("config lock poisoned")
+ .find_site(site_name)
+ .cloned();
+ let Some(site) = site else {
+ info!("[{site_name}] site not found");
+ let body = serde_json::to_string(&ErrorResponse { error: "not_found" })
+ .expect("static JSON serialization");
+ let _ = request.respond(json_response(404, &body));
+ return;
+ };
- // Rate limit check (per token)
- if state.rate_limiter.check_key(&token.to_owned()).is_err() {
- info!(%site_name, "rate limit exceeded");
- return error_response(StatusCode::TOO_MANY_REQUESTS, "rate_limit_exceeded")
- .into_response();
+ // Auth check (if configured)
+ if !site.webhook_token.is_empty() {
+ let token_valid = extract_bearer_token(request.headers())
+ .is_some_and(|token| validate_token(token, &site.webhook_token));
+
+ if !token_valid {
+ info!("[{site_name}] unauthorized request");
+ let body = serde_json::to_string(&ErrorResponse {
+ error: "unauthorized",
+ })
+ .expect("static JSON serialization");
+ let _ = request.respond(json_response(401, &body));
+ return;
}
}
// Try immediate build
- let Some(guard) = BuildGuard::try_acquire(site_name.clone(), &state.build_scheduler) else {
+ let Some(guard) = BuildGuard::try_acquire(site_name.to_owned(), &state.build_scheduler) else {
// Build in progress — try to queue
- if state.build_scheduler.try_queue(&site_name) {
- info!(%site_name, "build queued");
- return (
- StatusCode::ACCEPTED,
- Json(QueuedResponse { status: "queued" }),
- )
- .into_response();
+ if state.build_scheduler.try_queue(site_name) {
+ info!("[{site_name}] build queued");
+ let body = serde_json::to_string(&QueuedResponse { status: "queued" })
+ .expect("static JSON serialization");
+ let _ = request.respond(json_response(202, &body));
+ return;
}
// Already queued — collapse
- info!(%site_name, "build already queued, collapsing");
- return StatusCode::ACCEPTED.into_response();
+ info!("[{site_name}] build already queued, collapsing");
+ let _ = request.respond(empty_response(202));
+ return;
};
- info!(%site_name, "deployment accepted");
+ info!("[{site_name}] deployment accepted");
// Spawn async build pipeline with queue drain loop
- tokio::spawn(async move {
+ let state = state.clone();
+ let site_name = site_name.to_owned();
+ handle.spawn(async move {
let mut current_site = site;
let mut current_guard = guard;
loop {
@@ -160,9 +151,15 @@ async fn deploy_handler(
if !state.build_scheduler.take_queued(&site_name) {
break;
}
- info!(%site_name, "processing queued rebuild");
- let Some(new_site) = state.config.read().await.find_site(&site_name).cloned() else {
- warn!(%site_name, "site removed from config, skipping queued rebuild");
+ info!("[{site_name}] processing queued rebuild");
+ let Some(new_site) = state
+ .config
+ .read()
+ .expect("config lock poisoned")
+ .find_site(&site_name)
+ .cloned()
+ else {
+ warn!("[{site_name}] site removed from config, skipping queued rebuild");
break;
};
let Some(new_guard) =
@@ -175,7 +172,43 @@ async fn deploy_handler(
}
});
- StatusCode::ACCEPTED.into_response()
+ let _ = request.respond(empty_response(202));
+}
+
+/// Main request loop (runs on `std::thread`).
+#[allow(clippy::needless_pass_by_value)] // ownership required by std::thread::spawn callers
+pub(crate) fn handle_requests(
+ server: Arc<Server>,
+ state: AppState,
+ handle: tokio::runtime::Handle,
+) {
+ for request in server.incoming_requests() {
+ let path = request.url().split('?').next().unwrap_or("").to_owned();
+ let method = request.method().clone();
+
+ match (method, path.as_str()) {
+ (Method::Get, "/health") => {
+ let body = serde_json::to_string(&HealthResponse { status: "ok" })
+ .expect("static JSON serialization");
+ let _ = request.respond(json_response(200, &body));
+ }
+ (_, "/health") => {
+ let _ = request.respond(empty_response(405));
+ }
+ (Method::Post, _) if is_site_path(&path) => {
+ let site_name = &path[1..];
+ handle_deploy(request, site_name, &state, &handle);
+ }
+ (_, _) if is_site_path(&path) => {
+ let _ = request.respond(empty_response(405));
+ }
+ _ => {
+ let body = serde_json::to_string(&ErrorResponse { error: "not_found" })
+ .expect("static JSON serialization");
+ let _ = request.respond(json_response(404, &body));
+ }
+ }
+ }
}
/// Run the complete build pipeline: git sync → build → publish.
@@ -187,7 +220,7 @@ pub(crate) async fn run_build_pipeline(
_guard: BuildGuard,
) {
let (base_dir, log_dir, container_runtime, max_builds_to_keep, git_timeout) = {
- let config = state.config.read().await;
+ let config = state.config.read().expect("config lock poisoned");
(
config.base_dir.clone(),
config.log_dir.clone(),
@@ -213,14 +246,13 @@ pub(crate) async fn run_build_pipeline(
{
Ok(result) => {
info!(
- %site_name,
- build_dir = %result.build_dir.display(),
- duration_secs = result.duration.as_secs(),
- "pipeline completed"
+ "[{site_name}] pipeline completed: build_dir={} duration_secs={}",
+ result.build_dir.display(),
+ result.duration.as_secs()
);
}
Err(e) => {
- error!(%site_name, error = %e, "pipeline failed");
+ error!("[{site_name}] pipeline failed: {e}");
}
}
}
@@ -239,38 +271,41 @@ pub(crate) fn setup_sighup_handler(state: AppState) {
let config_path = state.config_path.as_ref();
match Config::load(config_path).await {
Ok(new_config) => {
- let old_sites_count = state.config.read().await.sites.len();
+ let old_sites_count = state
+ .config
+ .read()
+ .expect("config lock poisoned")
+ .sites
+ .len();
let new_sites_count = new_config.sites.len();
// Check for non-reloadable changes and capture old values
let (old_listen, old_base, old_log_dir, old_log_level) = {
- let old_config = state.config.read().await;
+ let old_config = state.config.read().expect("config lock poisoned");
if old_config.listen_address != new_config.listen_address {
warn!(
- old = %old_config.listen_address,
- new = %new_config.listen_address,
- "listen_address changed but cannot be reloaded (restart required)"
+ "listen_address changed but cannot be reloaded (restart required): old={} new={}",
+ old_config.listen_address, new_config.listen_address
);
}
if old_config.base_dir != new_config.base_dir {
warn!(
- old = %old_config.base_dir.display(),
- new = %new_config.base_dir.display(),
- "base_dir changed but cannot be reloaded (restart required)"
+ "base_dir changed but cannot be reloaded (restart required): old={} new={}",
+ old_config.base_dir.display(),
+ new_config.base_dir.display()
);
}
if old_config.log_dir != new_config.log_dir {
warn!(
- old = %old_config.log_dir.display(),
- new = %new_config.log_dir.display(),
- "log_dir changed but cannot be reloaded (restart required)"
+ "log_dir changed but cannot be reloaded (restart required): old={} new={}",
+ old_config.log_dir.display(),
+ new_config.log_dir.display()
);
}
if old_config.log_level != new_config.log_level {
warn!(
- old = %old_config.log_level,
- new = %new_config.log_level,
- "log_level changed but cannot be reloaded (restart required)"
+ "log_level changed but cannot be reloaded (restart required): old={} new={}",
+ old_config.log_level, new_config.log_level
);
}
(
@@ -289,7 +324,7 @@ pub(crate) fn setup_sighup_handler(state: AppState) {
final_config.log_level = old_log_level;
// Apply the merged configuration
- *state.config.write().await = final_config;
+ *state.config.write().expect("config lock poisoned") = final_config;
// Restart polling tasks with new configuration
info!("restarting polling tasks");
@@ -297,12 +332,11 @@ pub(crate) fn setup_sighup_handler(state: AppState) {
state.polling_manager.start_polling(state.clone()).await;
info!(
- old_sites_count,
- new_sites_count, "configuration reloaded successfully"
+ "configuration reloaded successfully: old_sites_count={old_sites_count} new_sites_count={new_sites_count}"
);
}
Err(e) => {
- error!(error = %e, "failed to reload configuration, keeping current config");
+ error!("failed to reload configuration, keeping current config: {e}");
}
}
}
@@ -315,28 +349,14 @@ pub(crate) fn setup_sighup_handler(state: AppState) {
///
/// Returns an error if the TCP listener cannot bind or the server encounters
/// a fatal I/O error.
-///
-/// # Panics
-///
-/// Panics if `rate_limit_per_minute` is zero. This is unreachable after
-/// successful config validation.
pub async fn run(config: Config, config_path: PathBuf) -> Result<()> {
let addr = config.parsed_listen_address();
- #[allow(clippy::expect_used)] // validated by Config::validate_rate_limit()
- let quota = Quota::per_minute(
- NonZeroU32::new(config.rate_limit_per_minute)
- .expect("rate_limit_per_minute must be greater than 0"),
- );
- let rate_limiter = Arc::new(RateLimiter::dashmap(quota));
- let polling_manager = Arc::new(PollingManager::new());
-
let state = AppState {
config: Arc::new(RwLock::new(config)),
config_path: Arc::new(config_path),
build_scheduler: Arc::new(BuildScheduler::new()),
- rate_limiter,
- polling_manager,
+ polling_manager: Arc::new(PollingManager::new()),
};
// Setup SIGHUP handler for configuration hot-reload
@@ -345,37 +365,54 @@ pub async fn run(config: Config, config_path: PathBuf) -> Result<()> {
// Start polling tasks for sites with poll_interval configured
state.polling_manager.start_polling(state.clone()).await;
- let listener = TcpListener::bind(addr).await?;
- info!(%addr, "server listening");
+ let server = Arc::new(Server::http(addr).map_err(|e| anyhow::anyhow!("failed to bind: {e}"))?);
+ info!("server listening on {addr}");
- run_with_listener(state, listener, async {
+ // Shutdown handler: signal → unblock server
+ let shutdown_server = Arc::clone(&server);
+ tokio::spawn(async move {
let mut sigterm = signal(SignalKind::terminate()).expect("failed to setup SIGTERM handler");
let mut sigint = signal(SignalKind::interrupt()).expect("failed to setup SIGINT handler");
tokio::select! {
_ = sigterm.recv() => info!("received SIGTERM, shutting down"),
_ = sigint.recv() => info!("received SIGINT, shutting down"),
}
+ shutdown_server.unblock();
+ });
+
+ // Run HTTP loop on blocking thread
+ let handle = tokio::runtime::Handle::current();
+ tokio::task::spawn_blocking(move || {
+ handle_requests(server, state, handle);
})
- .await
+ .await?;
+
+ Ok(())
}
-/// Run the server on an already-bound listener with a custom shutdown signal.
+/// Run the server with a pre-built Server, shutting down when `shutdown_signal` resolves.
///
-/// This is the core server loop used by both production (`run`) and integration tests.
-/// Production delegates here after binding the listener and setting up SIGHUP handlers.
-/// Tests call this via `test_support::run_server` with their own listener and shutdown channel.
-pub(crate) async fn run_with_listener(
+/// Used by integration tests via [`test_support::run_server`].
+/// Returns a `std::thread::JoinHandle` for the request-handling thread.
+#[cfg(any(test, feature = "integration"))]
+pub(crate) fn run_with_server(
state: AppState,
- listener: TcpListener,
+ server: Arc<Server>,
shutdown_signal: impl std::future::Future<Output = ()> + Send + 'static,
-) -> Result<()> {
- let router = create_router(state);
+) -> std::thread::JoinHandle<()> {
+ let handle = tokio::runtime::Handle::current();
- axum::serve(listener, router)
- .with_graceful_shutdown(shutdown_signal)
- .await?;
+ // Shutdown: wait for signal, then unblock
+ let shutdown_server = Arc::clone(&server);
+ tokio::spawn(async move {
+ shutdown_signal.await;
+ shutdown_server.unblock();
+ });
- Ok(())
+ // Spawn request handler on std::thread, return handle for joining
+ std::thread::spawn(move || {
+ handle_requests(server, state, handle);
+ })
}
#[cfg(test)]
@@ -383,23 +420,13 @@ pub(crate) async fn run_with_listener(
mod tests {
use super::*;
use crate::config::{BuildOverrides, SiteConfig};
- use axum::body::Body;
- use axum::http::{Request, StatusCode};
- use axum::response::Response;
use std::path::PathBuf;
- use tower::ServiceExt as _;
fn test_state(config: Config) -> AppState {
- test_state_with_rate_limit(config, 1000) // High limit for most tests
- }
-
- fn test_state_with_rate_limit(config: Config, rate_limit: u32) -> AppState {
- let quota = Quota::per_minute(NonZeroU32::new(rate_limit).unwrap());
AppState {
config: Arc::new(RwLock::new(config)),
config_path: Arc::new(PathBuf::from("witryna.toml")),
build_scheduler: Arc::new(BuildScheduler::new()),
- rate_limiter: Arc::new(RateLimiter::dashmap(quota)),
polling_manager: Arc::new(PollingManager::new()),
}
}
@@ -411,7 +438,6 @@ mod tests {
base_dir: PathBuf::from("/var/lib/witryna"),
log_dir: PathBuf::from("/var/log/witryna"),
log_level: "info".to_owned(),
- rate_limit_per_minute: 10,
max_builds_to_keep: 5,
git_timeout: None,
sites: vec![],
@@ -445,221 +471,169 @@ mod tests {
}
}
+ /// Start a test server on a random port, returning the server handle, state, and port.
+ fn test_server(config: Config) -> (Arc<Server>, AppState, u16) {
+ let state = test_state(config);
+ let server = Arc::new(Server::http("127.0.0.1:0").unwrap());
+ let port = match server.server_addr() {
+ tiny_http::ListenAddr::IP(a) => a.port(),
+ _ => unreachable!("expected IP address"),
+ };
+ let handle = tokio::runtime::Handle::current();
+ let server_clone = server.clone();
+ let state_clone = state.clone();
+ std::thread::spawn(move || handle_requests(server_clone, state_clone, handle));
+ (server, state, port)
+ }
+
#[tokio::test]
async fn health_endpoint_returns_ok() {
- let state = test_state(test_config_with_sites());
- let router = create_router(state);
-
- let response: Response = router
- .oneshot(
- Request::builder()
- .uri("/health")
- .body(Body::empty())
- .unwrap(),
- )
+ let (server, _state, port) = test_server(test_config_with_sites());
+ let resp = reqwest::get(format!("http://127.0.0.1:{port}/health"))
.await
.unwrap();
-
- assert_eq!(response.status(), StatusCode::OK);
- let body = axum::body::to_bytes(response.into_body(), 1024)
- .await
- .unwrap();
- let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
+ assert_eq!(resp.status().as_u16(), 200);
+ let json: serde_json::Value = resp.json().await.unwrap();
assert_eq!(json["status"], "ok");
+ server.unblock();
}
#[tokio::test]
- async fn unknown_site_post_returns_not_found() {
- let state = test_state(test_config());
- let router = create_router(state);
-
- let response: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/nonexistent")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
+ async fn json_responses_have_content_type_header() {
+ let (server, _state, port) = test_server(test_config_with_sites());
+ let resp = reqwest::get(format!("http://127.0.0.1:{port}/health"))
+ .await
+ .unwrap();
+ assert_eq!(
+ resp.headers()
+ .get("content-type")
+ .unwrap()
+ .to_str()
+ .unwrap(),
+ "application/json"
+ );
+ server.unblock();
+ }
- assert_eq!(response.status(), StatusCode::NOT_FOUND);
- let body = axum::body::to_bytes(response.into_body(), 1024)
+ #[tokio::test]
+ async fn unknown_site_post_returns_not_found() {
+ let (server, _state, port) = test_server(test_config());
+ let client = reqwest::Client::new();
+ let resp = client
+ .post(format!("http://127.0.0.1:{port}/nonexistent"))
+ .send()
.await
.unwrap();
- let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
+ assert_eq!(resp.status().as_u16(), 404);
+ let json: serde_json::Value = resp.json().await.unwrap();
assert_eq!(json["error"], "not_found");
+ server.unblock();
}
#[tokio::test]
async fn deploy_known_site_with_valid_token_returns_accepted() {
- let state = test_state(test_config_with_sites());
- let router = create_router(state);
-
- let response: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .header("Authorization", "Bearer secret-token")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
-
- assert_eq!(response.status(), StatusCode::ACCEPTED);
- let body = axum::body::to_bytes(response.into_body(), 1024)
+ let (server, _state, port) = test_server(test_config_with_sites());
+ let client = reqwest::Client::new();
+ let resp = client
+ .post(format!("http://127.0.0.1:{port}/my-site"))
+ .header("Authorization", "Bearer secret-token")
+ .send()
.await
.unwrap();
- assert!(body.is_empty());
+ assert_eq!(resp.status().as_u16(), 202);
+ server.unblock();
}
#[tokio::test]
async fn deploy_missing_auth_header_returns_unauthorized() {
- let state = test_state(test_config_with_sites());
- let router = create_router(state);
-
- let response: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
-
- assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
- let body = axum::body::to_bytes(response.into_body(), 1024)
+ let (server, _state, port) = test_server(test_config_with_sites());
+ let client = reqwest::Client::new();
+ let resp = client
+ .post(format!("http://127.0.0.1:{port}/my-site"))
+ .send()
.await
.unwrap();
- let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
+ assert_eq!(resp.status().as_u16(), 401);
+ let json: serde_json::Value = resp.json().await.unwrap();
assert_eq!(json["error"], "unauthorized");
+ server.unblock();
}
#[tokio::test]
async fn deploy_invalid_token_returns_unauthorized() {
- let state = test_state(test_config_with_sites());
- let router = create_router(state);
-
- let response: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .header("Authorization", "Bearer wrong-token")
- .body(Body::empty())
- .unwrap(),
- )
+ let (server, _state, port) = test_server(test_config_with_sites());
+ let client = reqwest::Client::new();
+ let resp = client
+ .post(format!("http://127.0.0.1:{port}/my-site"))
+ .header("Authorization", "Bearer wrong-token")
+ .send()
.await
.unwrap();
-
- assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
- let body = axum::body::to_bytes(response.into_body(), 1024)
- .await
- .unwrap();
- let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
+ assert_eq!(resp.status().as_u16(), 401);
+ let json: serde_json::Value = resp.json().await.unwrap();
assert_eq!(json["error"], "unauthorized");
+ server.unblock();
}
#[tokio::test]
async fn deploy_malformed_auth_header_returns_unauthorized() {
- let state = test_state(test_config_with_sites());
- let router = create_router(state);
-
+ let (server, _state, port) = test_server(test_config_with_sites());
+ let client = reqwest::Client::new();
// Test without "Bearer " prefix
- let response: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .header("Authorization", "secret-token")
- .body(Body::empty())
- .unwrap(),
- )
+ let resp = client
+ .post(format!("http://127.0.0.1:{port}/my-site"))
+ .header("Authorization", "secret-token")
+ .send()
.await
.unwrap();
-
- assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
- let body = axum::body::to_bytes(response.into_body(), 1024)
- .await
- .unwrap();
- let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
+ assert_eq!(resp.status().as_u16(), 401);
+ let json: serde_json::Value = resp.json().await.unwrap();
assert_eq!(json["error"], "unauthorized");
+ server.unblock();
}
#[tokio::test]
async fn deploy_basic_auth_returns_unauthorized() {
- let state = test_state(test_config_with_sites());
- let router = create_router(state);
-
+ let (server, _state, port) = test_server(test_config_with_sites());
+ let client = reqwest::Client::new();
// Test Basic auth instead of Bearer
- let response: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .header("Authorization", "Basic dXNlcjpwYXNz")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
-
- assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
- let body = axum::body::to_bytes(response.into_body(), 1024)
+ let resp = client
+ .post(format!("http://127.0.0.1:{port}/my-site"))
+ .header("Authorization", "Basic dXNlcjpwYXNz")
+ .send()
.await
.unwrap();
- let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
+ assert_eq!(resp.status().as_u16(), 401);
+ let json: serde_json::Value = resp.json().await.unwrap();
assert_eq!(json["error"], "unauthorized");
+ server.unblock();
}
#[tokio::test]
async fn deploy_get_method_not_allowed() {
- let state = test_state(test_config_with_sites());
- let router = create_router(state);
-
- let response: Response = router
- .oneshot(
- Request::builder()
- .method("GET")
- .uri("/my-site")
- .body(Body::empty())
- .unwrap(),
- )
+ let (server, _state, port) = test_server(test_config_with_sites());
+ let resp = reqwest::get(format!("http://127.0.0.1:{port}/my-site"))
.await
.unwrap();
-
- assert_eq!(response.status(), StatusCode::METHOD_NOT_ALLOWED);
+ assert_eq!(resp.status().as_u16(), 405);
+ server.unblock();
}
#[tokio::test]
async fn deploy_unknown_site_with_token_returns_not_found() {
- let state = test_state(test_config_with_sites());
- let router = create_router(state);
-
- let response: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/unknown-site")
- .header("Authorization", "Bearer any-token")
- .body(Body::empty())
- .unwrap(),
- )
+ let (server, _state, port) = test_server(test_config_with_sites());
+ let client = reqwest::Client::new();
+ let resp = client
+ .post(format!("http://127.0.0.1:{port}/unknown-site"))
+ .header("Authorization", "Bearer any-token")
+ .send()
.await
.unwrap();
-
// Returns 404 before checking token (site lookup first)
- assert_eq!(response.status(), StatusCode::NOT_FOUND);
- let body = axum::body::to_bytes(response.into_body(), 1024)
- .await
- .unwrap();
- let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
+ assert_eq!(resp.status().as_u16(), 404);
+ let json: serde_json::Value = resp.json().await.unwrap();
assert_eq!(json["error"], "not_found");
+ server.unblock();
}
fn test_config_with_two_sites() -> Config {
@@ -669,7 +643,6 @@ mod tests {
base_dir: PathBuf::from("/var/lib/witryna"),
log_dir: PathBuf::from("/var/log/witryna"),
log_level: "info".to_owned(),
- rate_limit_per_minute: 10,
max_builds_to_keep: 5,
git_timeout: None,
sites: vec![
@@ -721,290 +694,92 @@ mod tests {
#[tokio::test]
async fn deploy_concurrent_same_site_gets_queued() {
- let state = test_state(test_config_with_sites());
- let router = create_router(state.clone());
-
- // First request should succeed (immediate build)
- let response1: Response = router
- .clone()
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .header("Authorization", "Bearer secret-token")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
- assert_eq!(response1.status(), StatusCode::ACCEPTED);
- let body1 = axum::body::to_bytes(response1.into_body(), 1024)
- .await
- .unwrap();
- assert!(body1.is_empty());
-
- // Second request to same site should be queued (202 with body)
- let response2: Response = router
- .clone()
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .header("Authorization", "Bearer secret-token")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
- assert_eq!(response2.status(), StatusCode::ACCEPTED);
- let body2 = axum::body::to_bytes(response2.into_body(), 1024)
+ let (server, state, port) = test_server(test_config_with_sites());
+ let client = reqwest::Client::new();
+
+ // Pre-mark site as building to simulate an in-progress build
+ state
+ .build_scheduler
+ .in_progress
+ .lock()
+ .unwrap()
+ .insert("my-site".to_owned());
+
+ // First request to same site should be queued (202 with body)
+ let resp1 = client
+ .post(format!("http://127.0.0.1:{port}/my-site"))
+ .header("Authorization", "Bearer secret-token")
+ .send()
.await
.unwrap();
- let json: serde_json::Value = serde_json::from_slice(&body2).unwrap();
+ assert_eq!(resp1.status().as_u16(), 202);
+ let json: serde_json::Value = resp1.json().await.unwrap();
assert_eq!(json["status"], "queued");
- // Third request should be collapsed (202, no body)
- let response3: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .header("Authorization", "Bearer secret-token")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
- assert_eq!(response3.status(), StatusCode::ACCEPTED);
- let body3 = axum::body::to_bytes(response3.into_body(), 1024)
+ // Second request should be collapsed (202, no body)
+ let resp2 = client
+ .post(format!("http://127.0.0.1:{port}/my-site"))
+ .header("Authorization", "Bearer secret-token")
+ .send()
.await
.unwrap();
- assert!(body3.is_empty());
+ assert_eq!(resp2.status().as_u16(), 202);
+
+ server.unblock();
}
#[tokio::test]
async fn deploy_concurrent_different_sites_both_succeed() {
- let state = test_state(test_config_with_two_sites());
- let router = create_router(state.clone());
+ let (server, _state, port) = test_server(test_config_with_two_sites());
+ let client = reqwest::Client::new();
// First site deployment
- let response1: Response = router
- .clone()
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/site-one")
- .header("Authorization", "Bearer token-one")
- .body(Body::empty())
- .unwrap(),
- )
+ let resp1 = client
+ .post(format!("http://127.0.0.1:{port}/site-one"))
+ .header("Authorization", "Bearer token-one")
+ .send()
.await
.unwrap();
- assert_eq!(response1.status(), StatusCode::ACCEPTED);
+ assert_eq!(resp1.status().as_u16(), 202);
// Second site deployment should also succeed
- let response2: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/site-two")
- .header("Authorization", "Bearer token-two")
- .body(Body::empty())
- .unwrap(),
- )
+ let resp2 = client
+ .post(format!("http://127.0.0.1:{port}/site-two"))
+ .header("Authorization", "Bearer token-two")
+ .send()
.await
.unwrap();
- assert_eq!(response2.status(), StatusCode::ACCEPTED);
+ assert_eq!(resp2.status().as_u16(), 202);
+
+ server.unblock();
}
#[tokio::test]
async fn deploy_site_in_progress_checked_after_auth() {
- let state = test_state(test_config_with_sites());
+ let (server, state, port) = test_server(test_config_with_sites());
// Pre-mark site as building
state
.build_scheduler
.in_progress
+ .lock()
+ .unwrap()
.insert("my-site".to_owned());
- let router = create_router(state);
+ let client = reqwest::Client::new();
// Request with wrong token should return 401 (auth checked before build status)
- let response: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .header("Authorization", "Bearer wrong-token")
- .body(Body::empty())
- .unwrap(),
- )
+ let resp = client
+ .post(format!("http://127.0.0.1:{port}/my-site"))
+ .header("Authorization", "Bearer wrong-token")
+ .send()
.await
.unwrap();
- assert_eq!(response.status(), StatusCode::UNAUTHORIZED);
- let body = axum::body::to_bytes(response.into_body(), 1024)
- .await
- .unwrap();
- let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
+ assert_eq!(resp.status().as_u16(), 401);
+ let json: serde_json::Value = resp.json().await.unwrap();
assert_eq!(json["error"], "unauthorized");
- }
- #[tokio::test]
- async fn rate_limit_exceeded_returns_429() {
- // Create state with rate limit of 2 per minute
- let state = test_state_with_rate_limit(test_config_with_sites(), 2);
- let router = create_router(state);
-
- // First request should succeed
- let response1: Response = router
- .clone()
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .header("Authorization", "Bearer secret-token")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
- assert_eq!(response1.status(), StatusCode::ACCEPTED);
-
- // Second request should succeed (or 409 if build in progress)
- let response2: Response = router
- .clone()
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .header("Authorization", "Bearer secret-token")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
- // Could be 202 or 409 depending on timing
- assert!(
- response2.status() == StatusCode::ACCEPTED
- || response2.status() == StatusCode::CONFLICT
- );
-
- // Third request should hit rate limit
- let response3: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .header("Authorization", "Bearer secret-token")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
- assert_eq!(response3.status(), StatusCode::TOO_MANY_REQUESTS);
- let body = axum::body::to_bytes(response3.into_body(), 1024)
- .await
- .unwrap();
- let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
- assert_eq!(json["error"], "rate_limit_exceeded");
- }
-
- #[tokio::test]
- async fn rate_limit_different_tokens_independent() {
- // Create state with rate limit of 1 per minute
- let state = test_state_with_rate_limit(test_config_with_two_sites(), 1);
- let router = create_router(state);
-
- // First request with token-one should succeed
- let response1: Response = router
- .clone()
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/site-one")
- .header("Authorization", "Bearer token-one")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
- assert_eq!(response1.status(), StatusCode::ACCEPTED);
-
- // Second request with token-one should hit rate limit
- let response2: Response = router
- .clone()
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/site-one")
- .header("Authorization", "Bearer token-one")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
- assert_eq!(response2.status(), StatusCode::TOO_MANY_REQUESTS);
- let body = axum::body::to_bytes(response2.into_body(), 1024)
- .await
- .unwrap();
- let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
- assert_eq!(json["error"], "rate_limit_exceeded");
-
- // Request with different token should still succeed
- let response3: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/site-two")
- .header("Authorization", "Bearer token-two")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
- assert_eq!(response3.status(), StatusCode::ACCEPTED);
- }
-
- #[tokio::test]
- async fn rate_limit_checked_after_auth() {
- // Create state with rate limit of 1 per minute
- let state = test_state_with_rate_limit(test_config_with_sites(), 1);
- let router = create_router(state);
-
- // First valid request exhausts rate limit
- let response1: Response = router
- .clone()
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .header("Authorization", "Bearer secret-token")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
- assert_eq!(response1.status(), StatusCode::ACCEPTED);
-
- // Request with invalid token should return 401, not 429
- // (auth is checked before rate limit)
- let response2: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/my-site")
- .header("Authorization", "Bearer wrong-token")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
- assert_eq!(response2.status(), StatusCode::UNAUTHORIZED);
- let body = axum::body::to_bytes(response2.into_body(), 1024)
- .await
- .unwrap();
- let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
- assert_eq!(json["error"], "unauthorized");
+ server.unblock();
}
#[tokio::test]
@@ -1016,7 +791,6 @@ mod tests {
base_dir: PathBuf::from("/var/lib/witryna"),
log_dir: PathBuf::from("/var/log/witryna"),
log_level: "info".to_owned(),
- rate_limit_per_minute: 10,
max_builds_to_keep: 5,
git_timeout: None,
sites: vec![SiteConfig {
@@ -1052,7 +826,6 @@ mod tests {
base_dir: PathBuf::from("/tmp/new-base"),
log_dir: PathBuf::from("/tmp/new-logs"),
log_level: "debug".to_owned(),
- rate_limit_per_minute: 20,
max_builds_to_keep: 10,
git_timeout: None,
sites: vec![SiteConfig {
@@ -1080,7 +853,7 @@ mod tests {
// Apply the same merge logic used in setup_sighup_handler
let (old_listen, old_base, old_log_dir, old_log_level) = {
- let old_config = state.config.read().await;
+ let old_config = state.config.read().unwrap();
(
old_config.listen_address.clone(),
old_config.base_dir.clone(),
@@ -1095,21 +868,30 @@ mod tests {
final_config.log_dir = old_log_dir;
final_config.log_level = old_log_level;
- *state.config.write().await = final_config;
-
- // Verify non-reloadable fields are preserved
- let config = state.config.read().await;
- assert_eq!(config.listen_address, "127.0.0.1:8080");
- assert_eq!(config.base_dir, PathBuf::from("/var/lib/witryna"));
- assert_eq!(config.log_dir, PathBuf::from("/var/log/witryna"));
- assert_eq!(config.log_level, "info");
-
- // Verify reloadable fields are updated
- assert_eq!(config.container_runtime, "docker");
- assert_eq!(config.rate_limit_per_minute, 20);
- assert_eq!(config.max_builds_to_keep, 10);
- assert_eq!(config.sites.len(), 1);
- assert_eq!(config.sites[0].name, "new-site");
+ *state.config.write().unwrap() = final_config;
+
+ // Verify non-reloadable fields are preserved and reloadable fields are updated
+ let (listen, base, log_d, log_l, runtime, max_builds, sites_len, site_name) = {
+ let config = state.config.read().unwrap();
+ (
+ config.listen_address.clone(),
+ config.base_dir.clone(),
+ config.log_dir.clone(),
+ config.log_level.clone(),
+ config.container_runtime.clone(),
+ config.max_builds_to_keep,
+ config.sites.len(),
+ config.sites[0].name.clone(),
+ )
+ };
+ assert_eq!(listen, "127.0.0.1:8080");
+ assert_eq!(base, PathBuf::from("/var/lib/witryna"));
+ assert_eq!(log_d, PathBuf::from("/var/log/witryna"));
+ assert_eq!(log_l, "info");
+ assert_eq!(runtime, "docker");
+ assert_eq!(max_builds, 10);
+ assert_eq!(sites_len, 1);
+ assert_eq!(site_name, "new-site");
}
fn test_config_with_disabled_auth() -> Config {
@@ -1140,80 +922,34 @@ mod tests {
#[tokio::test]
async fn deploy_disabled_auth_returns_accepted() {
- let state = test_state(test_config_with_disabled_auth());
- let router = create_router(state);
+ let (server, _state, port) = test_server(test_config_with_disabled_auth());
+ let client = reqwest::Client::new();
// Request without Authorization header should succeed
- let response: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/open-site")
- .body(Body::empty())
- .unwrap(),
- )
+ let resp = client
+ .post(format!("http://127.0.0.1:{port}/open-site"))
+ .send()
.await
.unwrap();
+ assert_eq!(resp.status().as_u16(), 202);
- assert_eq!(response.status(), StatusCode::ACCEPTED);
+ server.unblock();
}
#[tokio::test]
async fn deploy_disabled_auth_ignores_token() {
- let state = test_state(test_config_with_disabled_auth());
- let router = create_router(state);
+ let (server, _state, port) = test_server(test_config_with_disabled_auth());
+ let client = reqwest::Client::new();
// Request WITH a Bearer token should also succeed (token ignored)
- let response: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/open-site")
- .header("Authorization", "Bearer any-token")
- .body(Body::empty())
- .unwrap(),
- )
+ let resp = client
+ .post(format!("http://127.0.0.1:{port}/open-site"))
+ .header("Authorization", "Bearer any-token")
+ .send()
.await
.unwrap();
+ assert_eq!(resp.status().as_u16(), 202);
- assert_eq!(response.status(), StatusCode::ACCEPTED);
- }
-
- #[tokio::test]
- async fn deploy_disabled_auth_rate_limited_by_site_name() {
- let state = test_state_with_rate_limit(test_config_with_disabled_auth(), 1);
- let router = create_router(state);
-
- // First request should succeed
- let response1: Response = router
- .clone()
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/open-site")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
- assert_eq!(response1.status(), StatusCode::ACCEPTED);
-
- // Second request should hit rate limit (keyed by site name)
- let response2: Response = router
- .oneshot(
- Request::builder()
- .method("POST")
- .uri("/open-site")
- .body(Body::empty())
- .unwrap(),
- )
- .await
- .unwrap();
- assert_eq!(response2.status(), StatusCode::TOO_MANY_REQUESTS);
- let body = axum::body::to_bytes(response2.into_body(), 1024)
- .await
- .unwrap();
- let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
- assert_eq!(json["error"], "rate_limit_exceeded");
+ server.unblock();
}
}