diff options
| author | Dawid Rycerz <dawid@rycerz.xyz> | 2026-02-08 12:44:10 +0100 |
|---|---|---|
| committer | Dawid Rycerz <dawid@rycerz.xyz> | 2026-02-08 12:44:10 +0100 |
| commit | 0c20fb86633104744dbccf30ad732296694fff1b (patch) | |
| tree | 02ffb8494086960b4a84decf3bdc2c8c61bfc4f6 /src/video/pipewire.rs | |
Initial pipewiremain
Diffstat (limited to 'src/video/pipewire.rs')
| -rw-r--r-- | src/video/pipewire.rs | 773 |
1 files changed, 773 insertions, 0 deletions
diff --git a/src/video/pipewire.rs b/src/video/pipewire.rs new file mode 100644 index 0000000..62dad11 --- /dev/null +++ b/src/video/pipewire.rs @@ -0,0 +1,773 @@ +//! PipeWire backend for video streaming using native library + +use super::{VideoBackendTrait, VideoFormat, VideoStats}; +use crate::error::{Result, VideoError}; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering, Ordering as AtomicOrdering}; +use std::sync::Mutex; +use std::time::Instant; +use tracing::{debug, info, trace, error, warn}; +use std::thread; +use std::sync::mpsc::{self, Sender, Receiver}; + +// PipeWire imports +use pipewire::{ + main_loop::MainLoop, + context::Context, + stream::{Stream, StreamFlags, StreamState}, + properties::properties, + keys, + spa::pod::{self, Pod, Object}, + spa::utils::{Direction, SpaTypes, Fraction, Rectangle}, + spa::param::ParamType, + spa::param::format::{FormatProperties, MediaSubtype, MediaType}, + spa::pod::serialize::PodSerializer, +}; + +/// PipeWire backend implementation using native library +pub struct PipeWireBackend { + is_initialized: bool, + stats: Arc<Mutex<VideoStats>>, + config: PipeWireConfig, + running: Arc<AtomicBool>, + virtual_node_id: Option<u32>, + pw_frame_sender: Option<Sender<Vec<u8>>>, // Separate sender for PipeWire thread + stats_frame_sender: Option<Sender<Vec<u8>>>, // Separate sender for stats thread + last_frame_time: Arc<Mutex<Instant>>, + + // PipeWire objects - these need to be in a separate thread-safe context + pw_thread: Option<thread::JoinHandle<()>>, +} + +/// PipeWire configuration +#[derive(Debug, Clone)] +pub struct PipeWireConfig { + pub node_name: String, + pub description: String, + pub media_class: String, + pub format: VideoFormat, + pub width: u32, + pub height: u32, + pub framerate: u32, +} + +impl Default for PipeWireConfig { + fn default() -> Self { + Self { + node_name: "geek-szitman-supercamera".to_string(), + description: "Geek Szitman SuperCamera - High-quality virtual camera for streaming and recording".to_string(), + media_class: "Video/Source".to_string(), + format: VideoFormat::MJPEG, // Changed back to MJPEG since that's what the camera provides + width: 640, + height: 480, + framerate: 30, + } + } +} + +impl PipeWireBackend { + pub fn new(config: PipeWireConfig) -> Self { + Self { + is_initialized: false, + stats: Arc::new(Mutex::new(VideoStats::default())), + config, + running: Arc::new(AtomicBool::new(false)), + virtual_node_id: None, + pw_frame_sender: None, + stats_frame_sender: None, + last_frame_time: Arc::new(Mutex::new(Instant::now())), + pw_thread: None, + } + } + + /// Check if PipeWire is available and running + fn check_pipewire_available(&self) -> Result<()> { + info!("Checking PipeWire availability..."); + // This is a basic check - in a real implementation you might want to + // try to connect to the daemon to verify it's actually running + info!("PipeWire availability check passed"); + Ok(()) + } + + /// Create a virtual camera node using native PipeWire API + fn create_virtual_camera_node(&mut self) -> Result<()> { + info!("Creating PipeWire virtual camera node using native API..."); + info!("Node name: '{}'", self.config.node_name); + info!("Node description: '{}'", self.config.description); + + // Start PipeWire processing in a separate thread to avoid Send/Sync issues + // The actual node creation and availability will be logged in the PipeWire thread + // Ensure the processing loop runs + self.running.store(true, Ordering::Relaxed); + let running = Arc::clone(&self.running); + let config = self.config.clone(); + + // Create channel for frame communication with PipeWire thread + let (frame_sender, frame_receiver) = mpsc::channel(); + self.pw_frame_sender = Some(frame_sender); + + info!("Starting PipeWire thread..."); + let handle = thread::spawn(move || { + info!("PipeWire thread started, entering main loop..."); + // Set panic hook to catch any panics in this thread + std::panic::set_hook(Box::new(|panic_info| { + error!("PipeWire thread panicked: {:?}", panic_info); + })); + + Self::pipewire_main_loop(running, config, frame_receiver); + info!("PipeWire thread exiting..."); + }); + + self.pw_thread = Some(handle); + self.virtual_node_id = Some(999); // Placeholder - will be updated when stream is ready + info!("Virtual camera node creation initiated in separate thread"); + Ok(()) + } + + /// Main PipeWire loop that runs in a separate thread + fn pipewire_main_loop(running: Arc<AtomicBool>, config: PipeWireConfig, frame_receiver: Receiver<Vec<u8>>) { + info!("Starting PipeWire main loop in thread"); + + info!("Initializing PipeWire..."); + pipewire::init(); + info!("PipeWire initialized successfully"); + + // Create main loop with no properties + info!("Creating PipeWire main loop..."); + let mainloop = match MainLoop::new(None) { + Ok(ml) => { + info!("Main loop created successfully"); + ml + }, + Err(e) => { + error!("Failed to create PipeWire main loop: {}", e); + error!("MainLoop::new error details: {:?}", e); + return; + } + }; + + // Create context + info!("Creating PipeWire context..."); + let context = match Context::new(&mainloop) { + Ok(ctx) => { + info!("Context created successfully"); + ctx + }, + Err(e) => { + error!("Failed to create PipeWire context: {}", e); + return; + } + }; + + // Connect to PipeWire daemon + info!("Connecting to PipeWire daemon..."); + let core = match context.connect(None) { + Ok(c) => { + info!("Connected to PipeWire daemon successfully"); + c + }, + Err(e) => { + error!("Failed to connect to PipeWire daemon: {}", e); + return; + } + }; + + info!("PipeWire connection established successfully"); + + // Set up registry listener to capture object.serial when our node appears + let serial_slot = Arc::new(AtomicU32::new(0)); + let serial_slot_clone = Arc::clone(&serial_slot); + let wanted_name = config.node_name.clone(); + + let registry = core.get_registry().expect("get_registry"); + let _reg_listener = registry + .add_listener_local() + .global(move |global_obj| { + if global_obj.type_ == pipewire::types::ObjectType::Node { + if let Some(props) = &global_obj.props { + if let Some(name) = props.get("node.name") { + if name == wanted_name { + if let Some(s) = props.get("object.serial") { + if let Ok(v) = s.parse::<u32>() { + serial_slot_clone.store(v, AtomicOrdering::SeqCst); + info!("Discovered our node in registry: node.name={} object.serial={}", name, v); + } + } + } + } + } + } + }) + .register(); + + // User data for stream callbacks + #[derive(Debug)] + struct UserData { + is_mjpeg: bool, + frame_size: u32, + stride: i32, + current_frame: Arc<Mutex<Option<Vec<u8>>>>, + } + + let current_frame = Arc::new(Mutex::new(None)); + let current_frame_clone = Arc::clone(¤t_frame); + + // Start frame receiver thread + let frame_receiver = Arc::new(Mutex::new(frame_receiver)); + let frame_receiver_clone = Arc::clone(&frame_receiver); + let running_clone = Arc::clone(&running); + let _frame_thread = thread::spawn(move || { + while running_clone.load(Ordering::Relaxed) { + let frame_data = { + let receiver_guard = frame_receiver_clone.lock().unwrap(); + match receiver_guard.recv_timeout(std::time::Duration::from_millis(16)) { + Ok(data) => Some(data), + Err(mpsc::RecvTimeoutError::Timeout) => None, + Err(mpsc::RecvTimeoutError::Disconnected) => break, + } + }; + + if let Some(frame_data) = frame_data { + let mut frame_guard = current_frame_clone.lock().unwrap(); + *frame_guard = Some(frame_data); + trace!("Received new frame for PipeWire processing"); + } + } + }); + + // Create a stream that will act as a video source + let stream = match Stream::new( + &core, + &config.node_name, + properties! { + // Essential keys for Video/Source classification + *keys::MEDIA_CLASS => "Video/Source", + *keys::NODE_NAME => config.node_name.as_str(), + *keys::APP_NAME => "geek-szitman-supercamera", + *keys::NODE_DESCRIPTION => config.description.as_str(), + // Additional metadata + "media.role" => "Camera", + "media.category" => "Capture", + // Optional cosmetics + "media.nick" => "SuperCamera", + "device.icon_name" => "camera-web", + // Prevent PipeWire from trying to drive the graph until someone connects + "node.passive" => "true", + }, + ) { + Ok(s) => s, + Err(e) => { + error!("Failed to create PipeWire stream: {}", e); + return; + } + }; + + // Build EnumFormat pod(s) - simplified to just MJPEG + let width_u = config.width as u32; + let height_u = config.height as u32; + let fps_u = config.framerate as u32; + + // MJPEG: JPEG compressed - simplified format + let enum_mjpeg = pod::object!( + SpaTypes::ObjectParamFormat, + ParamType::EnumFormat, + pod::property!(FormatProperties::MediaType, Id, MediaType::Video), + pod::property!(FormatProperties::MediaSubtype, Id, MediaSubtype::Mjpg), + pod::property!( + FormatProperties::VideoSize, + Choice, Range, Rectangle, + Rectangle { width: width_u, height: height_u }, + Rectangle { width: 16, height: 16 }, + Rectangle { width: 4096, height: 4096 } + ), + pod::property!( + FormatProperties::VideoFramerate, + Choice, Range, Fraction, + Fraction { num: fps_u, denom: 1 }, + Fraction { num: 1, denom: 1 }, + Fraction { num: 120, denom: 1 } + ), + ); + + // Clone config values for closures + let config_width = config.width; + let config_height = config.height; + let config_framerate = config.framerate; + + // Set up stream callbacks + let _listener = match stream + .add_local_listener_with_user_data(UserData { + is_mjpeg: false, + frame_size: 4 * 1024 * 1024, // safe cap + stride: 0, + current_frame: Arc::clone(¤t_frame), + }) + .state_changed(move |stream, _user_data, old, new| { + info!("PipeWire stream state: {:?} -> {:?}", old, new); + if matches!(new, StreamState::Paused | StreamState::Streaming) { + info!("PipeWire node is ready and can be targeted by applications"); + } + if new == StreamState::Paused { + if let Err(e) = stream.set_active(true) { + error!("Failed to activate PipeWire stream: {}", e); + } else { + info!("Activated stream scheduling"); + } + } + if new == StreamState::Streaming { + info!("Stream is now streaming - virtual camera is active!"); + } + }) + .param_changed(move |stream, user_data, id, param| { + if let Some(param) = param { + info!("Param changed: id={:?}, type={:?}, raw_id={}", id, param.type_(), id); + + // Handle format negotiation - simplified approach + if id == ParamType::Format.as_raw() || id == ParamType::EnumFormat.as_raw() || id == 15 { + info!("Format param received (id={}), setting up basic MJPEG format...", id); + + // Set basic MJPEG parameters + user_data.is_mjpeg = true; + user_data.frame_size = 4 * 1024 * 1024; // 4MB safe cap for MJPEG + user_data.stride = 0; // MJPEG doesn't have stride + info!("Basic MJPEG format configured: {}x{} @ {} fps", config_width, config_height, config_framerate); + + // Try to activate the stream directly + if let Err(e) = stream.set_active(true) { + error!("Failed to activate stream: {}", e); + } else { + info!("Stream activated successfully"); + } + } else { + trace!("Stream param changed: id={} (ignored)", id); + } + } else { + trace!("Stream param changed: id={} (ignored)", id); + } + }) + .process(move |stream, user_data| { + // Dequeue buffer + let Some(mut buffer) = stream.dequeue_buffer() else { + trace!("Out of buffers"); + return; + }; + + // Get the current frame from UPP protocol + let frame_data = { + let frame_guard = user_data.current_frame.lock().unwrap(); + frame_guard.clone() + }; + + if let Some(frame_data) = frame_data { + // Process actual camera frame data from UPP protocol + trace!("Processing UPP camera frame: {} bytes", frame_data.len()); + + for data in buffer.datas_mut() { + if let Some(mem) = data.data() { + let len = mem.len(); + + if !user_data.is_mjpeg { + // Handle raw formats (RGBx or I420) + let w = config_width as usize; + let h = config_height as usize; + let stride = user_data.stride as usize; + + if frame_data.len() >= w * h * 3 { + // Convert RGB to RGBA + let mut off = 0usize; + for y in 0..h { + let row_end = (off + stride).min(len); + let row = &mut mem[off..row_end]; + + for x in 0..w.min(row.len()/4) { + let src_idx = (y * w + x) * 3; + if src_idx + 2 < frame_data.len() { + row[x * 4 + 0] = frame_data[src_idx + 0]; // R + row[x * 4 + 1] = frame_data[src_idx + 1]; // G + row[x * 4 + 2] = frame_data[src_idx + 2]; // B + row[x * 4 + 3] = 255; // A + } else { + row[x * 4 + 0] = 0; // R + row[x * 4 + 1] = 0; // G + row[x * 4 + 2] = 0; // B + row[x * 4 + 3] = 255; // A + } + } + + off = off.saturating_add(stride); + if off >= len { break; } + } + + *data.chunk_mut().size_mut() = (w * h * 4) as u32; + *data.chunk_mut().stride_mut() = user_data.stride; + } else { + // Frame data too small, fill with black + for i in 0..len { + mem[i] = 0; + } + *data.chunk_mut().size_mut() = len as u32; + *data.chunk_mut().stride_mut() = user_data.stride; + } + } else { + // Handle MJPEG format - copy JPEG data directly + if frame_data.len() <= len { + mem[..frame_data.len()].copy_from_slice(&frame_data); + *data.chunk_mut().size_mut() = frame_data.len() as u32; + trace!("Copied MJPEG frame: {} bytes", frame_data.len()); + } else { + // Frame too large for buffer, truncate + mem[..len].copy_from_slice(&frame_data[..len]); + *data.chunk_mut().size_mut() = len as u32; + warn!("MJPEG frame truncated: {} -> {} bytes", frame_data.len(), len); + } + } + } + } + } else { + // No frame data available, generate black frame as fallback + trace!("No UPP frame data available, generating black frame"); + + for data in buffer.datas_mut() { + if let Some(mem) = data.data() { + let len = mem.len(); + + if !user_data.is_mjpeg { + // Fill with black for raw formats + for i in 0..len { + mem[i] = 0; + } + + let w = config_width as usize; + let h = config_height as usize; + *data.chunk_mut().size_mut() = (w * h * 4) as u32; + *data.chunk_mut().stride_mut() = user_data.stride; + } else { + // Generate minimal valid 1x1 black JPEG for MJPEG format + // This is a minimal valid JPEG that represents a 1x1 black pixel + let minimal_jpeg: [u8; 143] = [ + 0xFF, 0xD8, 0xFF, 0xE0, 0x00, 0x10, 0x4A, 0x46, 0x49, 0x46, 0x00, 0x01, + 0x01, 0x01, 0x00, 0x48, 0x00, 0x48, 0x00, 0x00, 0xFF, 0xDB, 0x00, 0x43, + 0x00, 0x08, 0x06, 0x06, 0x07, 0x06, 0x05, 0x08, 0x07, 0x07, 0x07, 0x09, + 0x09, 0x08, 0x0A, 0x0C, 0x14, 0x0D, 0x0C, 0x0B, 0x0B, 0x0C, 0x19, 0x12, + 0x13, 0x0F, 0x14, 0x1D, 0x1A, 0x1F, 0x1E, 0x1D, 0x1A, 0x1C, 0x1C, 0x20, + 0x24, 0x2E, 0x27, 0x20, 0x22, 0x2C, 0x23, 0x1C, 0x1C, 0x28, 0x37, 0x29, + 0x2C, 0x30, 0x31, 0x34, 0x34, 0x34, 0x1F, 0x27, 0x39, 0x3D, 0x38, 0x32, + 0x3C, 0x2E, 0x33, 0x34, 0x32, 0xFF, 0xC0, 0x00, 0x11, 0x08, 0x00, 0x01, + 0x00, 0x01, 0x01, 0x01, 0x11, 0x00, 0x02, 0x11, 0x01, 0x03, 0x11, 0x01, + 0xFF, 0xC4, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0xFF, 0xDA, + 0x00, 0x08, 0x01, 0x01, 0x00, 0x00, 0x3F, 0x00, 0x37, 0xFF, 0xD9 + ]; + + let copy_len = minimal_jpeg.len().min(len); + mem[..copy_len].copy_from_slice(&minimal_jpeg[..copy_len]); + *data.chunk_mut().size_mut() = copy_len as u32; + trace!("Generated minimal 1x1 black JPEG placeholder: {} bytes, chunk size set to {}", copy_len, copy_len); + } + } + } + } + + // Debug: Log chunk sizes before queuing + for (i, data) in buffer.datas_mut().iter_mut().enumerate() { + let chunk = data.chunk_mut(); + trace!("Buffer {} chunk {}: size={}, stride={}", i, i, chunk.size(), chunk.stride()); + } + + // Return buffer to stream by dropping it (this automatically queues it) + // The Buffer struct implements Drop which handles the queuing + drop(buffer); + }) + .register() + { + Ok(l) => l, + Err(e) => { + error!("Failed to register stream listener: {}", e); + return; + } + }; + + // Connect as an output (we are a source). Use MAP_BUFFERS only, not DRIVER. + // Serialize EnumFormat pods to bytes and build &Pod slice + let obj_to_pod = |obj: Object| -> Vec<u8> { + let value = pod::Value::Object(obj); + PodSerializer::serialize(std::io::Cursor::new(Vec::new()), &value) + .unwrap() + .0 + .into_inner() + }; + let enum_bytes: Vec<Vec<u8>> = vec![obj_to_pod(enum_mjpeg)]; + let mut enum_pods: Vec<&Pod> = enum_bytes.iter().map(|b| Pod::from_bytes(b).unwrap()).collect(); + + if let Err(e) = stream.connect( + Direction::Output, + None, + StreamFlags::MAP_BUFFERS, + &mut enum_pods[..], + ) { + error!("Failed to connect PipeWire stream: {}", e); + return; + } + + info!("Stream connected successfully"); + info!("Virtual camera node '{}' is connecting to PipeWire", config.node_name); + info!("Other applications can now attempt to negotiate formats"); + + // Wait for our node to appear in the registry and capture object.serial + let t0 = std::time::Instant::now(); + while serial_slot.load(AtomicOrdering::SeqCst) == 0 && t0.elapsed() < std::time::Duration::from_millis(1500) { + mainloop.loop_().iterate(std::time::Duration::from_millis(10)); + } + let serial_logged = serial_slot.load(AtomicOrdering::SeqCst); + if serial_logged != 0 { + info!("You can target this node with: target-object={} or target-object={}", serial_logged, config.node_name); + } else { + warn!("Node serial not observed yet in registry; it may appear a bit later."); + } + + // Run main loop until told to stop + info!("Starting main loop iteration..."); + let mut iteration_count = 0; + while running.load(Ordering::Relaxed) { + iteration_count += 1; + if iteration_count % 1000 == 0 { + info!("Main loop iteration: {}", iteration_count); + } + + // Drive loop + let result = mainloop.loop_().iterate(std::time::Duration::from_millis(16)); + if result < 0 { + error!("Main loop iteration failed with result: {}", result); + break; + } + } + info!("Main loop exited after {} iterations", iteration_count); + } + + /// Start frame processing thread + fn start_frame_processor(&mut self) -> Result<()> { + let (tx, rx) = mpsc::channel(); + self.stats_frame_sender = Some(tx); // Use separate sender for stats + + let running = Arc::clone(&self.running); + let stats = Arc::clone(&self.stats); + let last_frame_time = Arc::clone(&self.last_frame_time); + + thread::spawn(move || { + Self::frame_processing_loop(rx, running, stats, last_frame_time); + }); + + info!("Frame processing thread started"); + Ok(()) + } + + /// Frame processing loop that runs in a separate thread + fn frame_processing_loop( + rx: Receiver<Vec<u8>>, + running: Arc<AtomicBool>, + stats: Arc<Mutex<VideoStats>>, + last_frame_time: Arc<Mutex<Instant>>, + ) { + while running.load(Ordering::Relaxed) { + match rx.recv_timeout(std::time::Duration::from_millis(100)) { + Ok(frame_data) => { + // Process frame and update statistics + Self::update_stats(&stats, &last_frame_time, frame_data.len()); + trace!("Frame processed: {} bytes", frame_data.len()); + } + Err(mpsc::RecvTimeoutError::Timeout) => { + continue; + } + Err(mpsc::RecvTimeoutError::Disconnected) => { + break; + } + } + } + } + + /// Update statistics with proper FPS calculation + fn update_stats( + stats: &Arc<Mutex<VideoStats>>, + last_frame_time: &Arc<Mutex<Instant>>, + frame_size: usize, + ) { + let mut stats_guard = stats.lock().unwrap(); + stats_guard.frames_pushed += 1; + stats_guard.total_bytes += frame_size as u64; + stats_guard.backend_type = super::VideoBackendType::PipeWire; + stats_guard.is_ready = true; + + let now = Instant::now(); + let mut last_time = last_frame_time.lock().unwrap(); + let duration = now.duration_since(*last_time); + *last_time = now; + + if duration.as_millis() > 0 { + stats_guard.fps = 1000.0 / duration.as_millis() as f64; + } + } + + /// Get current node information for external tools + pub fn get_node_info(&self) -> Option<(u32, u32, String)> { + // This would need to be implemented with proper synchronization + // For now, return the config info + Some(( + self.virtual_node_id.unwrap_or(0), + 0, // object.serial - would need to be stored from the stream + self.config.node_name.clone(), + )) + } + + /// Get the object.serial for targeting (if available) + pub fn get_object_serial(&self) -> Option<u32> { + // This would need to be implemented with proper synchronization + // For now, return None - the serial is logged when discovered + None + } + + /// Check if the virtual camera node is registered and discoverable + pub fn is_node_registered(&self) -> bool { + self.is_initialized && self.running.load(Ordering::Relaxed) + } +} + +impl VideoBackendTrait for PipeWireBackend { + fn initialize(&mut self) -> Result<()> { + if self.is_initialized { + return Ok(()); + } + + info!("Initializing PipeWire backend with native library..."); + + if let Err(e) = self.check_pipewire_available() { + error!("PipeWire not available: {}", e); + return Err(VideoError::DeviceNotReady.into()); + } + + if let Err(e) = self.create_virtual_camera_node() { + error!("Failed to create virtual camera node: {}", e); + return Err(VideoError::DeviceNotReady.into()); + } + + if let Err(e) = self.start_frame_processor() { + error!("Failed to start frame processor: {}", e); + return Err(VideoError::DeviceNotReady.into()); + } + + self.is_initialized = true; + self.running.store(true, Ordering::Relaxed); + info!("PipeWire backend initialized successfully with native library"); + // Remove premature logging - the actual node creation will be logged in the PipeWire thread + // when the node is actually available + + Ok(()) + } + + fn push_frame(&self, frame_data: &[u8]) -> Result<()> { + if !self.running.load(Ordering::Relaxed) { + return Err(VideoError::DeviceNotReady.into()); + } + + if !self.is_initialized { + return Err(VideoError::DeviceNotReady.into()); + } + + trace!("Queueing frame for PipeWire: {} bytes", frame_data.len()); + + // Send to PipeWire thread + if let Some(sender) = &self.pw_frame_sender { + if let Err(e) = sender.send(frame_data.to_vec()) { + error!("Failed to queue frame to PipeWire: {}", e); + return Err(VideoError::DeviceNotReady.into()); + } + debug!("Frame queued for PipeWire processing: {} bytes", frame_data.len()); + } else { + error!("PipeWire frame sender not available"); + return Err(VideoError::DeviceNotReady.into()); + } + + // Send to stats thread + if let Some(sender) = &self.stats_frame_sender { + if let Err(e) = sender.send(frame_data.to_vec()) { + error!("Failed to queue frame to stats: {}", e); + // Don't fail the entire operation for stats + } + } + + trace!("Frame queued successfully"); + Ok(()) + } + + fn get_stats(&self) -> VideoStats { + self.stats.lock().unwrap().clone() + } + + fn is_ready(&self) -> bool { + self.is_initialized && self.running.load(Ordering::Relaxed) + } + + fn shutdown(&mut self) -> Result<()> { + if !self.is_initialized { + return Ok(()) + } + + info!("Shutting down PipeWire backend..."); + + self.running.store(false, Ordering::Relaxed); + + if let Some(handle) = self.pw_thread.take() { + if let Err(e) = handle.join() { + error!("Error joining PipeWire thread: {:?}", e); + } + } + + self.virtual_node_id = None; + self.pw_frame_sender = None; + self.stats_frame_sender = None; + + self.is_initialized = false; + info!("PipeWire backend shut down successfully"); + + Ok(()) + } +} + +impl Drop for PipeWireBackend { + fn drop(&mut self) { + self.running.store(false, Ordering::Relaxed); + + if let Some(handle) = self.pw_thread.take() { + let _ = handle.join(); + } + + // Note: frame_senders will be dropped automatically + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_pipewire_backend_creation() { + let config = PipeWireConfig::default(); + let backend = PipeWireBackend::new(config); + + assert_eq!(backend.config.node_name, "geek-szitman-supercamera"); + assert_eq!(backend.config.description, "Geek Szitman SuperCamera - High-quality virtual camera for streaming and recording"); + assert_eq!(backend.config.media_class, "Video/Source"); + assert_eq!(backend.config.format, VideoFormat::MJPEG); + assert_eq!(backend.config.width, 640); + assert_eq!(backend.config.height, 480); + assert_eq!(backend.config.framerate, 30); + } + + #[test] + fn test_pipewire_backend_default_config() { + let config = PipeWireConfig::default(); + let backend = PipeWireBackend::new(config); + + assert!(!backend.is_initialized); + assert!(!backend.is_ready()); + } +} |
