//! PipeWire backend for video streaming using native library use super::{VideoBackendTrait, VideoFormat, VideoStats}; use crate::error::{Result, VideoError}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering, Ordering as AtomicOrdering}; use std::sync::Mutex; use std::time::Instant; use tracing::{debug, info, trace, error, warn}; use std::thread; use std::sync::mpsc::{self, Sender, Receiver}; // PipeWire imports use pipewire::{ main_loop::MainLoop, context::Context, stream::{Stream, StreamFlags, StreamState}, properties::properties, keys, spa::pod::{self, Pod, Object}, spa::utils::{Direction, SpaTypes, Fraction, Rectangle}, spa::param::ParamType, spa::param::format::{FormatProperties, MediaSubtype, MediaType}, spa::pod::serialize::PodSerializer, }; /// PipeWire backend implementation using native library pub struct PipeWireBackend { is_initialized: bool, stats: Arc>, config: PipeWireConfig, running: Arc, virtual_node_id: Option, pw_frame_sender: Option>>, // Separate sender for PipeWire thread stats_frame_sender: Option>>, // Separate sender for stats thread last_frame_time: Arc>, // PipeWire objects - these need to be in a separate thread-safe context pw_thread: Option>, } /// PipeWire configuration #[derive(Debug, Clone)] pub struct PipeWireConfig { pub node_name: String, pub description: String, pub media_class: String, pub format: VideoFormat, pub width: u32, pub height: u32, pub framerate: u32, } impl Default for PipeWireConfig { fn default() -> Self { Self { node_name: "geek-szitman-supercamera".to_string(), description: "Geek Szitman SuperCamera - High-quality virtual camera for streaming and recording".to_string(), media_class: "Video/Source".to_string(), format: VideoFormat::MJPEG, // Changed back to MJPEG since that's what the camera provides width: 640, height: 480, framerate: 30, } } } impl PipeWireBackend { pub fn new(config: PipeWireConfig) -> Self { Self { is_initialized: false, stats: Arc::new(Mutex::new(VideoStats::default())), config, running: Arc::new(AtomicBool::new(false)), virtual_node_id: None, pw_frame_sender: None, stats_frame_sender: None, last_frame_time: Arc::new(Mutex::new(Instant::now())), pw_thread: None, } } /// Check if PipeWire is available and running fn check_pipewire_available(&self) -> Result<()> { info!("Checking PipeWire availability..."); // This is a basic check - in a real implementation you might want to // try to connect to the daemon to verify it's actually running info!("PipeWire availability check passed"); Ok(()) } /// Create a virtual camera node using native PipeWire API fn create_virtual_camera_node(&mut self) -> Result<()> { info!("Creating PipeWire virtual camera node using native API..."); info!("Node name: '{}'", self.config.node_name); info!("Node description: '{}'", self.config.description); // Start PipeWire processing in a separate thread to avoid Send/Sync issues // The actual node creation and availability will be logged in the PipeWire thread // Ensure the processing loop runs self.running.store(true, Ordering::Relaxed); let running = Arc::clone(&self.running); let config = self.config.clone(); // Create channel for frame communication with PipeWire thread let (frame_sender, frame_receiver) = mpsc::channel(); self.pw_frame_sender = Some(frame_sender); info!("Starting PipeWire thread..."); let handle = thread::spawn(move || { info!("PipeWire thread started, entering main loop..."); // Set panic hook to catch any panics in this thread std::panic::set_hook(Box::new(|panic_info| { error!("PipeWire thread panicked: {:?}", panic_info); })); Self::pipewire_main_loop(running, config, frame_receiver); info!("PipeWire thread exiting..."); }); self.pw_thread = Some(handle); self.virtual_node_id = Some(999); // Placeholder - will be updated when stream is ready info!("Virtual camera node creation initiated in separate thread"); Ok(()) } /// Main PipeWire loop that runs in a separate thread fn pipewire_main_loop(running: Arc, config: PipeWireConfig, frame_receiver: Receiver>) { info!("Starting PipeWire main loop in thread"); info!("Initializing PipeWire..."); pipewire::init(); info!("PipeWire initialized successfully"); // Create main loop with no properties info!("Creating PipeWire main loop..."); let mainloop = match MainLoop::new(None) { Ok(ml) => { info!("Main loop created successfully"); ml }, Err(e) => { error!("Failed to create PipeWire main loop: {}", e); error!("MainLoop::new error details: {:?}", e); return; } }; // Create context info!("Creating PipeWire context..."); let context = match Context::new(&mainloop) { Ok(ctx) => { info!("Context created successfully"); ctx }, Err(e) => { error!("Failed to create PipeWire context: {}", e); return; } }; // Connect to PipeWire daemon info!("Connecting to PipeWire daemon..."); let core = match context.connect(None) { Ok(c) => { info!("Connected to PipeWire daemon successfully"); c }, Err(e) => { error!("Failed to connect to PipeWire daemon: {}", e); return; } }; info!("PipeWire connection established successfully"); // Set up registry listener to capture object.serial when our node appears let serial_slot = Arc::new(AtomicU32::new(0)); let serial_slot_clone = Arc::clone(&serial_slot); let wanted_name = config.node_name.clone(); let registry = core.get_registry().expect("get_registry"); let _reg_listener = registry .add_listener_local() .global(move |global_obj| { if global_obj.type_ == pipewire::types::ObjectType::Node { if let Some(props) = &global_obj.props { if let Some(name) = props.get("node.name") { if name == wanted_name { if let Some(s) = props.get("object.serial") { if let Ok(v) = s.parse::() { serial_slot_clone.store(v, AtomicOrdering::SeqCst); info!("Discovered our node in registry: node.name={} object.serial={}", name, v); } } } } } } }) .register(); // User data for stream callbacks #[derive(Debug)] struct UserData { is_mjpeg: bool, frame_size: u32, stride: i32, current_frame: Arc>>>, } let current_frame = Arc::new(Mutex::new(None)); let current_frame_clone = Arc::clone(¤t_frame); // Start frame receiver thread let frame_receiver = Arc::new(Mutex::new(frame_receiver)); let frame_receiver_clone = Arc::clone(&frame_receiver); let running_clone = Arc::clone(&running); let _frame_thread = thread::spawn(move || { while running_clone.load(Ordering::Relaxed) { let frame_data = { let receiver_guard = frame_receiver_clone.lock().unwrap(); match receiver_guard.recv_timeout(std::time::Duration::from_millis(16)) { Ok(data) => Some(data), Err(mpsc::RecvTimeoutError::Timeout) => None, Err(mpsc::RecvTimeoutError::Disconnected) => break, } }; if let Some(frame_data) = frame_data { let mut frame_guard = current_frame_clone.lock().unwrap(); *frame_guard = Some(frame_data); trace!("Received new frame for PipeWire processing"); } } }); // Create a stream that will act as a video source let stream = match Stream::new( &core, &config.node_name, properties! { // Essential keys for Video/Source classification *keys::MEDIA_CLASS => "Video/Source", *keys::NODE_NAME => config.node_name.as_str(), *keys::APP_NAME => "geek-szitman-supercamera", *keys::NODE_DESCRIPTION => config.description.as_str(), // Additional metadata "media.role" => "Camera", "media.category" => "Capture", // Optional cosmetics "media.nick" => "SuperCamera", "device.icon_name" => "camera-web", // Prevent PipeWire from trying to drive the graph until someone connects "node.passive" => "true", }, ) { Ok(s) => s, Err(e) => { error!("Failed to create PipeWire stream: {}", e); return; } }; // Build EnumFormat pod(s) - simplified to just MJPEG let width_u = config.width as u32; let height_u = config.height as u32; let fps_u = config.framerate as u32; // MJPEG: JPEG compressed - simplified format let enum_mjpeg = pod::object!( SpaTypes::ObjectParamFormat, ParamType::EnumFormat, pod::property!(FormatProperties::MediaType, Id, MediaType::Video), pod::property!(FormatProperties::MediaSubtype, Id, MediaSubtype::Mjpg), pod::property!( FormatProperties::VideoSize, Choice, Range, Rectangle, Rectangle { width: width_u, height: height_u }, Rectangle { width: 16, height: 16 }, Rectangle { width: 4096, height: 4096 } ), pod::property!( FormatProperties::VideoFramerate, Choice, Range, Fraction, Fraction { num: fps_u, denom: 1 }, Fraction { num: 1, denom: 1 }, Fraction { num: 120, denom: 1 } ), ); // Clone config values for closures let config_width = config.width; let config_height = config.height; let config_framerate = config.framerate; // Set up stream callbacks let _listener = match stream .add_local_listener_with_user_data(UserData { is_mjpeg: false, frame_size: 4 * 1024 * 1024, // safe cap stride: 0, current_frame: Arc::clone(¤t_frame), }) .state_changed(move |stream, _user_data, old, new| { info!("PipeWire stream state: {:?} -> {:?}", old, new); if matches!(new, StreamState::Paused | StreamState::Streaming) { info!("PipeWire node is ready and can be targeted by applications"); } if new == StreamState::Paused { if let Err(e) = stream.set_active(true) { error!("Failed to activate PipeWire stream: {}", e); } else { info!("Activated stream scheduling"); } } if new == StreamState::Streaming { info!("Stream is now streaming - virtual camera is active!"); } }) .param_changed(move |stream, user_data, id, param| { if let Some(param) = param { info!("Param changed: id={:?}, type={:?}, raw_id={}", id, param.type_(), id); // Handle format negotiation - simplified approach if id == ParamType::Format.as_raw() || id == ParamType::EnumFormat.as_raw() || id == 15 { info!("Format param received (id={}), setting up basic MJPEG format...", id); // Set basic MJPEG parameters user_data.is_mjpeg = true; user_data.frame_size = 4 * 1024 * 1024; // 4MB safe cap for MJPEG user_data.stride = 0; // MJPEG doesn't have stride info!("Basic MJPEG format configured: {}x{} @ {} fps", config_width, config_height, config_framerate); // Try to activate the stream directly if let Err(e) = stream.set_active(true) { error!("Failed to activate stream: {}", e); } else { info!("Stream activated successfully"); } } else { trace!("Stream param changed: id={} (ignored)", id); } } else { trace!("Stream param changed: id={} (ignored)", id); } }) .process(move |stream, user_data| { // Dequeue buffer let Some(mut buffer) = stream.dequeue_buffer() else { trace!("Out of buffers"); return; }; // Get the current frame from UPP protocol let frame_data = { let frame_guard = user_data.current_frame.lock().unwrap(); frame_guard.clone() }; if let Some(frame_data) = frame_data { // Process actual camera frame data from UPP protocol trace!("Processing UPP camera frame: {} bytes", frame_data.len()); for data in buffer.datas_mut() { if let Some(mem) = data.data() { let len = mem.len(); if !user_data.is_mjpeg { // Handle raw formats (RGBx or I420) let w = config_width as usize; let h = config_height as usize; let stride = user_data.stride as usize; if frame_data.len() >= w * h * 3 { // Convert RGB to RGBA let mut off = 0usize; for y in 0..h { let row_end = (off + stride).min(len); let row = &mut mem[off..row_end]; for x in 0..w.min(row.len()/4) { let src_idx = (y * w + x) * 3; if src_idx + 2 < frame_data.len() { row[x * 4 + 0] = frame_data[src_idx + 0]; // R row[x * 4 + 1] = frame_data[src_idx + 1]; // G row[x * 4 + 2] = frame_data[src_idx + 2]; // B row[x * 4 + 3] = 255; // A } else { row[x * 4 + 0] = 0; // R row[x * 4 + 1] = 0; // G row[x * 4 + 2] = 0; // B row[x * 4 + 3] = 255; // A } } off = off.saturating_add(stride); if off >= len { break; } } *data.chunk_mut().size_mut() = (w * h * 4) as u32; *data.chunk_mut().stride_mut() = user_data.stride; } else { // Frame data too small, fill with black for i in 0..len { mem[i] = 0; } *data.chunk_mut().size_mut() = len as u32; *data.chunk_mut().stride_mut() = user_data.stride; } } else { // Handle MJPEG format - copy JPEG data directly if frame_data.len() <= len { mem[..frame_data.len()].copy_from_slice(&frame_data); *data.chunk_mut().size_mut() = frame_data.len() as u32; trace!("Copied MJPEG frame: {} bytes", frame_data.len()); } else { // Frame too large for buffer, truncate mem[..len].copy_from_slice(&frame_data[..len]); *data.chunk_mut().size_mut() = len as u32; warn!("MJPEG frame truncated: {} -> {} bytes", frame_data.len(), len); } } } } } else { // No frame data available, generate black frame as fallback trace!("No UPP frame data available, generating black frame"); for data in buffer.datas_mut() { if let Some(mem) = data.data() { let len = mem.len(); if !user_data.is_mjpeg { // Fill with black for raw formats for i in 0..len { mem[i] = 0; } let w = config_width as usize; let h = config_height as usize; *data.chunk_mut().size_mut() = (w * h * 4) as u32; *data.chunk_mut().stride_mut() = user_data.stride; } else { // Generate minimal valid 1x1 black JPEG for MJPEG format // This is a minimal valid JPEG that represents a 1x1 black pixel let minimal_jpeg: [u8; 143] = [ 0xFF, 0xD8, 0xFF, 0xE0, 0x00, 0x10, 0x4A, 0x46, 0x49, 0x46, 0x00, 0x01, 0x01, 0x01, 0x00, 0x48, 0x00, 0x48, 0x00, 0x00, 0xFF, 0xDB, 0x00, 0x43, 0x00, 0x08, 0x06, 0x06, 0x07, 0x06, 0x05, 0x08, 0x07, 0x07, 0x07, 0x09, 0x09, 0x08, 0x0A, 0x0C, 0x14, 0x0D, 0x0C, 0x0B, 0x0B, 0x0C, 0x19, 0x12, 0x13, 0x0F, 0x14, 0x1D, 0x1A, 0x1F, 0x1E, 0x1D, 0x1A, 0x1C, 0x1C, 0x20, 0x24, 0x2E, 0x27, 0x20, 0x22, 0x2C, 0x23, 0x1C, 0x1C, 0x28, 0x37, 0x29, 0x2C, 0x30, 0x31, 0x34, 0x34, 0x34, 0x1F, 0x27, 0x39, 0x3D, 0x38, 0x32, 0x3C, 0x2E, 0x33, 0x34, 0x32, 0xFF, 0xC0, 0x00, 0x11, 0x08, 0x00, 0x01, 0x00, 0x01, 0x01, 0x01, 0x11, 0x00, 0x02, 0x11, 0x01, 0x03, 0x11, 0x01, 0xFF, 0xC4, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0xFF, 0xDA, 0x00, 0x08, 0x01, 0x01, 0x00, 0x00, 0x3F, 0x00, 0x37, 0xFF, 0xD9 ]; let copy_len = minimal_jpeg.len().min(len); mem[..copy_len].copy_from_slice(&minimal_jpeg[..copy_len]); *data.chunk_mut().size_mut() = copy_len as u32; trace!("Generated minimal 1x1 black JPEG placeholder: {} bytes, chunk size set to {}", copy_len, copy_len); } } } } // Debug: Log chunk sizes before queuing for (i, data) in buffer.datas_mut().iter_mut().enumerate() { let chunk = data.chunk_mut(); trace!("Buffer {} chunk {}: size={}, stride={}", i, i, chunk.size(), chunk.stride()); } // Return buffer to stream by dropping it (this automatically queues it) // The Buffer struct implements Drop which handles the queuing drop(buffer); }) .register() { Ok(l) => l, Err(e) => { error!("Failed to register stream listener: {}", e); return; } }; // Connect as an output (we are a source). Use MAP_BUFFERS only, not DRIVER. // Serialize EnumFormat pods to bytes and build &Pod slice let obj_to_pod = |obj: Object| -> Vec { let value = pod::Value::Object(obj); PodSerializer::serialize(std::io::Cursor::new(Vec::new()), &value) .unwrap() .0 .into_inner() }; let enum_bytes: Vec> = vec![obj_to_pod(enum_mjpeg)]; let mut enum_pods: Vec<&Pod> = enum_bytes.iter().map(|b| Pod::from_bytes(b).unwrap()).collect(); if let Err(e) = stream.connect( Direction::Output, None, StreamFlags::MAP_BUFFERS, &mut enum_pods[..], ) { error!("Failed to connect PipeWire stream: {}", e); return; } info!("Stream connected successfully"); info!("Virtual camera node '{}' is connecting to PipeWire", config.node_name); info!("Other applications can now attempt to negotiate formats"); // Wait for our node to appear in the registry and capture object.serial let t0 = std::time::Instant::now(); while serial_slot.load(AtomicOrdering::SeqCst) == 0 && t0.elapsed() < std::time::Duration::from_millis(1500) { mainloop.loop_().iterate(std::time::Duration::from_millis(10)); } let serial_logged = serial_slot.load(AtomicOrdering::SeqCst); if serial_logged != 0 { info!("You can target this node with: target-object={} or target-object={}", serial_logged, config.node_name); } else { warn!("Node serial not observed yet in registry; it may appear a bit later."); } // Run main loop until told to stop info!("Starting main loop iteration..."); let mut iteration_count = 0; while running.load(Ordering::Relaxed) { iteration_count += 1; if iteration_count % 1000 == 0 { info!("Main loop iteration: {}", iteration_count); } // Drive loop let result = mainloop.loop_().iterate(std::time::Duration::from_millis(16)); if result < 0 { error!("Main loop iteration failed with result: {}", result); break; } } info!("Main loop exited after {} iterations", iteration_count); } /// Start frame processing thread fn start_frame_processor(&mut self) -> Result<()> { let (tx, rx) = mpsc::channel(); self.stats_frame_sender = Some(tx); // Use separate sender for stats let running = Arc::clone(&self.running); let stats = Arc::clone(&self.stats); let last_frame_time = Arc::clone(&self.last_frame_time); thread::spawn(move || { Self::frame_processing_loop(rx, running, stats, last_frame_time); }); info!("Frame processing thread started"); Ok(()) } /// Frame processing loop that runs in a separate thread fn frame_processing_loop( rx: Receiver>, running: Arc, stats: Arc>, last_frame_time: Arc>, ) { while running.load(Ordering::Relaxed) { match rx.recv_timeout(std::time::Duration::from_millis(100)) { Ok(frame_data) => { // Process frame and update statistics Self::update_stats(&stats, &last_frame_time, frame_data.len()); trace!("Frame processed: {} bytes", frame_data.len()); } Err(mpsc::RecvTimeoutError::Timeout) => { continue; } Err(mpsc::RecvTimeoutError::Disconnected) => { break; } } } } /// Update statistics with proper FPS calculation fn update_stats( stats: &Arc>, last_frame_time: &Arc>, frame_size: usize, ) { let mut stats_guard = stats.lock().unwrap(); stats_guard.frames_pushed += 1; stats_guard.total_bytes += frame_size as u64; stats_guard.backend_type = super::VideoBackendType::PipeWire; stats_guard.is_ready = true; let now = Instant::now(); let mut last_time = last_frame_time.lock().unwrap(); let duration = now.duration_since(*last_time); *last_time = now; if duration.as_millis() > 0 { stats_guard.fps = 1000.0 / duration.as_millis() as f64; } } /// Get current node information for external tools pub fn get_node_info(&self) -> Option<(u32, u32, String)> { // This would need to be implemented with proper synchronization // For now, return the config info Some(( self.virtual_node_id.unwrap_or(0), 0, // object.serial - would need to be stored from the stream self.config.node_name.clone(), )) } /// Get the object.serial for targeting (if available) pub fn get_object_serial(&self) -> Option { // This would need to be implemented with proper synchronization // For now, return None - the serial is logged when discovered None } /// Check if the virtual camera node is registered and discoverable pub fn is_node_registered(&self) -> bool { self.is_initialized && self.running.load(Ordering::Relaxed) } } impl VideoBackendTrait for PipeWireBackend { fn initialize(&mut self) -> Result<()> { if self.is_initialized { return Ok(()); } info!("Initializing PipeWire backend with native library..."); if let Err(e) = self.check_pipewire_available() { error!("PipeWire not available: {}", e); return Err(VideoError::DeviceNotReady.into()); } if let Err(e) = self.create_virtual_camera_node() { error!("Failed to create virtual camera node: {}", e); return Err(VideoError::DeviceNotReady.into()); } if let Err(e) = self.start_frame_processor() { error!("Failed to start frame processor: {}", e); return Err(VideoError::DeviceNotReady.into()); } self.is_initialized = true; self.running.store(true, Ordering::Relaxed); info!("PipeWire backend initialized successfully with native library"); // Remove premature logging - the actual node creation will be logged in the PipeWire thread // when the node is actually available Ok(()) } fn push_frame(&self, frame_data: &[u8]) -> Result<()> { if !self.running.load(Ordering::Relaxed) { return Err(VideoError::DeviceNotReady.into()); } if !self.is_initialized { return Err(VideoError::DeviceNotReady.into()); } trace!("Queueing frame for PipeWire: {} bytes", frame_data.len()); // Send to PipeWire thread if let Some(sender) = &self.pw_frame_sender { if let Err(e) = sender.send(frame_data.to_vec()) { error!("Failed to queue frame to PipeWire: {}", e); return Err(VideoError::DeviceNotReady.into()); } debug!("Frame queued for PipeWire processing: {} bytes", frame_data.len()); } else { error!("PipeWire frame sender not available"); return Err(VideoError::DeviceNotReady.into()); } // Send to stats thread if let Some(sender) = &self.stats_frame_sender { if let Err(e) = sender.send(frame_data.to_vec()) { error!("Failed to queue frame to stats: {}", e); // Don't fail the entire operation for stats } } trace!("Frame queued successfully"); Ok(()) } fn get_stats(&self) -> VideoStats { self.stats.lock().unwrap().clone() } fn is_ready(&self) -> bool { self.is_initialized && self.running.load(Ordering::Relaxed) } fn shutdown(&mut self) -> Result<()> { if !self.is_initialized { return Ok(()) } info!("Shutting down PipeWire backend..."); self.running.store(false, Ordering::Relaxed); if let Some(handle) = self.pw_thread.take() { if let Err(e) = handle.join() { error!("Error joining PipeWire thread: {:?}", e); } } self.virtual_node_id = None; self.pw_frame_sender = None; self.stats_frame_sender = None; self.is_initialized = false; info!("PipeWire backend shut down successfully"); Ok(()) } } impl Drop for PipeWireBackend { fn drop(&mut self) { self.running.store(false, Ordering::Relaxed); if let Some(handle) = self.pw_thread.take() { let _ = handle.join(); } // Note: frame_senders will be dropped automatically } } #[cfg(test)] mod tests { use super::*; #[test] fn test_pipewire_backend_creation() { let config = PipeWireConfig::default(); let backend = PipeWireBackend::new(config); assert_eq!(backend.config.node_name, "geek-szitman-supercamera"); assert_eq!(backend.config.description, "Geek Szitman SuperCamera - High-quality virtual camera for streaming and recording"); assert_eq!(backend.config.media_class, "Video/Source"); assert_eq!(backend.config.format, VideoFormat::MJPEG); assert_eq!(backend.config.width, 640); assert_eq!(backend.config.height, 480); assert_eq!(backend.config.framerate, 30); } #[test] fn test_pipewire_backend_default_config() { let config = PipeWireConfig::default(); let backend = PipeWireBackend::new(config); assert!(!backend.is_initialized); assert!(!backend.is_ready()); } }