summaryrefslogtreecommitdiff
path: root/src/video/pipewire.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/video/pipewire.rs')
-rw-r--r--src/video/pipewire.rs773
1 files changed, 773 insertions, 0 deletions
diff --git a/src/video/pipewire.rs b/src/video/pipewire.rs
new file mode 100644
index 0000000..62dad11
--- /dev/null
+++ b/src/video/pipewire.rs
@@ -0,0 +1,773 @@
+//! PipeWire backend for video streaming using native library
+
+use super::{VideoBackendTrait, VideoFormat, VideoStats};
+use crate::error::{Result, VideoError};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, AtomicU32, Ordering, Ordering as AtomicOrdering};
+use std::sync::Mutex;
+use std::time::Instant;
+use tracing::{debug, info, trace, error, warn};
+use std::thread;
+use std::sync::mpsc::{self, Sender, Receiver};
+
+// PipeWire imports
+use pipewire::{
+ main_loop::MainLoop,
+ context::Context,
+ stream::{Stream, StreamFlags, StreamState},
+ properties::properties,
+ keys,
+ spa::pod::{self, Pod, Object},
+ spa::utils::{Direction, SpaTypes, Fraction, Rectangle},
+ spa::param::ParamType,
+ spa::param::format::{FormatProperties, MediaSubtype, MediaType},
+ spa::pod::serialize::PodSerializer,
+};
+
+/// PipeWire backend implementation using native library
+pub struct PipeWireBackend {
+ is_initialized: bool,
+ stats: Arc<Mutex<VideoStats>>,
+ config: PipeWireConfig,
+ running: Arc<AtomicBool>,
+ virtual_node_id: Option<u32>,
+ pw_frame_sender: Option<Sender<Vec<u8>>>, // Separate sender for PipeWire thread
+ stats_frame_sender: Option<Sender<Vec<u8>>>, // Separate sender for stats thread
+ last_frame_time: Arc<Mutex<Instant>>,
+
+ // PipeWire objects - these need to be in a separate thread-safe context
+ pw_thread: Option<thread::JoinHandle<()>>,
+}
+
+/// PipeWire configuration
+#[derive(Debug, Clone)]
+pub struct PipeWireConfig {
+ pub node_name: String,
+ pub description: String,
+ pub media_class: String,
+ pub format: VideoFormat,
+ pub width: u32,
+ pub height: u32,
+ pub framerate: u32,
+}
+
+impl Default for PipeWireConfig {
+ fn default() -> Self {
+ Self {
+ node_name: "geek-szitman-supercamera".to_string(),
+ description: "Geek Szitman SuperCamera - High-quality virtual camera for streaming and recording".to_string(),
+ media_class: "Video/Source".to_string(),
+ format: VideoFormat::MJPEG, // Changed back to MJPEG since that's what the camera provides
+ width: 640,
+ height: 480,
+ framerate: 30,
+ }
+ }
+}
+
+impl PipeWireBackend {
+ pub fn new(config: PipeWireConfig) -> Self {
+ Self {
+ is_initialized: false,
+ stats: Arc::new(Mutex::new(VideoStats::default())),
+ config,
+ running: Arc::new(AtomicBool::new(false)),
+ virtual_node_id: None,
+ pw_frame_sender: None,
+ stats_frame_sender: None,
+ last_frame_time: Arc::new(Mutex::new(Instant::now())),
+ pw_thread: None,
+ }
+ }
+
+ /// Check if PipeWire is available and running
+ fn check_pipewire_available(&self) -> Result<()> {
+ info!("Checking PipeWire availability...");
+ // This is a basic check - in a real implementation you might want to
+ // try to connect to the daemon to verify it's actually running
+ info!("PipeWire availability check passed");
+ Ok(())
+ }
+
+ /// Create a virtual camera node using native PipeWire API
+ fn create_virtual_camera_node(&mut self) -> Result<()> {
+ info!("Creating PipeWire virtual camera node using native API...");
+ info!("Node name: '{}'", self.config.node_name);
+ info!("Node description: '{}'", self.config.description);
+
+ // Start PipeWire processing in a separate thread to avoid Send/Sync issues
+ // The actual node creation and availability will be logged in the PipeWire thread
+ // Ensure the processing loop runs
+ self.running.store(true, Ordering::Relaxed);
+ let running = Arc::clone(&self.running);
+ let config = self.config.clone();
+
+ // Create channel for frame communication with PipeWire thread
+ let (frame_sender, frame_receiver) = mpsc::channel();
+ self.pw_frame_sender = Some(frame_sender);
+
+ info!("Starting PipeWire thread...");
+ let handle = thread::spawn(move || {
+ info!("PipeWire thread started, entering main loop...");
+ // Set panic hook to catch any panics in this thread
+ std::panic::set_hook(Box::new(|panic_info| {
+ error!("PipeWire thread panicked: {:?}", panic_info);
+ }));
+
+ Self::pipewire_main_loop(running, config, frame_receiver);
+ info!("PipeWire thread exiting...");
+ });
+
+ self.pw_thread = Some(handle);
+ self.virtual_node_id = Some(999); // Placeholder - will be updated when stream is ready
+ info!("Virtual camera node creation initiated in separate thread");
+ Ok(())
+ }
+
+ /// Main PipeWire loop that runs in a separate thread
+ fn pipewire_main_loop(running: Arc<AtomicBool>, config: PipeWireConfig, frame_receiver: Receiver<Vec<u8>>) {
+ info!("Starting PipeWire main loop in thread");
+
+ info!("Initializing PipeWire...");
+ pipewire::init();
+ info!("PipeWire initialized successfully");
+
+ // Create main loop with no properties
+ info!("Creating PipeWire main loop...");
+ let mainloop = match MainLoop::new(None) {
+ Ok(ml) => {
+ info!("Main loop created successfully");
+ ml
+ },
+ Err(e) => {
+ error!("Failed to create PipeWire main loop: {}", e);
+ error!("MainLoop::new error details: {:?}", e);
+ return;
+ }
+ };
+
+ // Create context
+ info!("Creating PipeWire context...");
+ let context = match Context::new(&mainloop) {
+ Ok(ctx) => {
+ info!("Context created successfully");
+ ctx
+ },
+ Err(e) => {
+ error!("Failed to create PipeWire context: {}", e);
+ return;
+ }
+ };
+
+ // Connect to PipeWire daemon
+ info!("Connecting to PipeWire daemon...");
+ let core = match context.connect(None) {
+ Ok(c) => {
+ info!("Connected to PipeWire daemon successfully");
+ c
+ },
+ Err(e) => {
+ error!("Failed to connect to PipeWire daemon: {}", e);
+ return;
+ }
+ };
+
+ info!("PipeWire connection established successfully");
+
+ // Set up registry listener to capture object.serial when our node appears
+ let serial_slot = Arc::new(AtomicU32::new(0));
+ let serial_slot_clone = Arc::clone(&serial_slot);
+ let wanted_name = config.node_name.clone();
+
+ let registry = core.get_registry().expect("get_registry");
+ let _reg_listener = registry
+ .add_listener_local()
+ .global(move |global_obj| {
+ if global_obj.type_ == pipewire::types::ObjectType::Node {
+ if let Some(props) = &global_obj.props {
+ if let Some(name) = props.get("node.name") {
+ if name == wanted_name {
+ if let Some(s) = props.get("object.serial") {
+ if let Ok(v) = s.parse::<u32>() {
+ serial_slot_clone.store(v, AtomicOrdering::SeqCst);
+ info!("Discovered our node in registry: node.name={} object.serial={}", name, v);
+ }
+ }
+ }
+ }
+ }
+ }
+ })
+ .register();
+
+ // User data for stream callbacks
+ #[derive(Debug)]
+ struct UserData {
+ is_mjpeg: bool,
+ frame_size: u32,
+ stride: i32,
+ current_frame: Arc<Mutex<Option<Vec<u8>>>>,
+ }
+
+ let current_frame = Arc::new(Mutex::new(None));
+ let current_frame_clone = Arc::clone(&current_frame);
+
+ // Start frame receiver thread
+ let frame_receiver = Arc::new(Mutex::new(frame_receiver));
+ let frame_receiver_clone = Arc::clone(&frame_receiver);
+ let running_clone = Arc::clone(&running);
+ let _frame_thread = thread::spawn(move || {
+ while running_clone.load(Ordering::Relaxed) {
+ let frame_data = {
+ let receiver_guard = frame_receiver_clone.lock().unwrap();
+ match receiver_guard.recv_timeout(std::time::Duration::from_millis(16)) {
+ Ok(data) => Some(data),
+ Err(mpsc::RecvTimeoutError::Timeout) => None,
+ Err(mpsc::RecvTimeoutError::Disconnected) => break,
+ }
+ };
+
+ if let Some(frame_data) = frame_data {
+ let mut frame_guard = current_frame_clone.lock().unwrap();
+ *frame_guard = Some(frame_data);
+ trace!("Received new frame for PipeWire processing");
+ }
+ }
+ });
+
+ // Create a stream that will act as a video source
+ let stream = match Stream::new(
+ &core,
+ &config.node_name,
+ properties! {
+ // Essential keys for Video/Source classification
+ *keys::MEDIA_CLASS => "Video/Source",
+ *keys::NODE_NAME => config.node_name.as_str(),
+ *keys::APP_NAME => "geek-szitman-supercamera",
+ *keys::NODE_DESCRIPTION => config.description.as_str(),
+ // Additional metadata
+ "media.role" => "Camera",
+ "media.category" => "Capture",
+ // Optional cosmetics
+ "media.nick" => "SuperCamera",
+ "device.icon_name" => "camera-web",
+ // Prevent PipeWire from trying to drive the graph until someone connects
+ "node.passive" => "true",
+ },
+ ) {
+ Ok(s) => s,
+ Err(e) => {
+ error!("Failed to create PipeWire stream: {}", e);
+ return;
+ }
+ };
+
+ // Build EnumFormat pod(s) - simplified to just MJPEG
+ let width_u = config.width as u32;
+ let height_u = config.height as u32;
+ let fps_u = config.framerate as u32;
+
+ // MJPEG: JPEG compressed - simplified format
+ let enum_mjpeg = pod::object!(
+ SpaTypes::ObjectParamFormat,
+ ParamType::EnumFormat,
+ pod::property!(FormatProperties::MediaType, Id, MediaType::Video),
+ pod::property!(FormatProperties::MediaSubtype, Id, MediaSubtype::Mjpg),
+ pod::property!(
+ FormatProperties::VideoSize,
+ Choice, Range, Rectangle,
+ Rectangle { width: width_u, height: height_u },
+ Rectangle { width: 16, height: 16 },
+ Rectangle { width: 4096, height: 4096 }
+ ),
+ pod::property!(
+ FormatProperties::VideoFramerate,
+ Choice, Range, Fraction,
+ Fraction { num: fps_u, denom: 1 },
+ Fraction { num: 1, denom: 1 },
+ Fraction { num: 120, denom: 1 }
+ ),
+ );
+
+ // Clone config values for closures
+ let config_width = config.width;
+ let config_height = config.height;
+ let config_framerate = config.framerate;
+
+ // Set up stream callbacks
+ let _listener = match stream
+ .add_local_listener_with_user_data(UserData {
+ is_mjpeg: false,
+ frame_size: 4 * 1024 * 1024, // safe cap
+ stride: 0,
+ current_frame: Arc::clone(&current_frame),
+ })
+ .state_changed(move |stream, _user_data, old, new| {
+ info!("PipeWire stream state: {:?} -> {:?}", old, new);
+ if matches!(new, StreamState::Paused | StreamState::Streaming) {
+ info!("PipeWire node is ready and can be targeted by applications");
+ }
+ if new == StreamState::Paused {
+ if let Err(e) = stream.set_active(true) {
+ error!("Failed to activate PipeWire stream: {}", e);
+ } else {
+ info!("Activated stream scheduling");
+ }
+ }
+ if new == StreamState::Streaming {
+ info!("Stream is now streaming - virtual camera is active!");
+ }
+ })
+ .param_changed(move |stream, user_data, id, param| {
+ if let Some(param) = param {
+ info!("Param changed: id={:?}, type={:?}, raw_id={}", id, param.type_(), id);
+
+ // Handle format negotiation - simplified approach
+ if id == ParamType::Format.as_raw() || id == ParamType::EnumFormat.as_raw() || id == 15 {
+ info!("Format param received (id={}), setting up basic MJPEG format...", id);
+
+ // Set basic MJPEG parameters
+ user_data.is_mjpeg = true;
+ user_data.frame_size = 4 * 1024 * 1024; // 4MB safe cap for MJPEG
+ user_data.stride = 0; // MJPEG doesn't have stride
+ info!("Basic MJPEG format configured: {}x{} @ {} fps", config_width, config_height, config_framerate);
+
+ // Try to activate the stream directly
+ if let Err(e) = stream.set_active(true) {
+ error!("Failed to activate stream: {}", e);
+ } else {
+ info!("Stream activated successfully");
+ }
+ } else {
+ trace!("Stream param changed: id={} (ignored)", id);
+ }
+ } else {
+ trace!("Stream param changed: id={} (ignored)", id);
+ }
+ })
+ .process(move |stream, user_data| {
+ // Dequeue buffer
+ let Some(mut buffer) = stream.dequeue_buffer() else {
+ trace!("Out of buffers");
+ return;
+ };
+
+ // Get the current frame from UPP protocol
+ let frame_data = {
+ let frame_guard = user_data.current_frame.lock().unwrap();
+ frame_guard.clone()
+ };
+
+ if let Some(frame_data) = frame_data {
+ // Process actual camera frame data from UPP protocol
+ trace!("Processing UPP camera frame: {} bytes", frame_data.len());
+
+ for data in buffer.datas_mut() {
+ if let Some(mem) = data.data() {
+ let len = mem.len();
+
+ if !user_data.is_mjpeg {
+ // Handle raw formats (RGBx or I420)
+ let w = config_width as usize;
+ let h = config_height as usize;
+ let stride = user_data.stride as usize;
+
+ if frame_data.len() >= w * h * 3 {
+ // Convert RGB to RGBA
+ let mut off = 0usize;
+ for y in 0..h {
+ let row_end = (off + stride).min(len);
+ let row = &mut mem[off..row_end];
+
+ for x in 0..w.min(row.len()/4) {
+ let src_idx = (y * w + x) * 3;
+ if src_idx + 2 < frame_data.len() {
+ row[x * 4 + 0] = frame_data[src_idx + 0]; // R
+ row[x * 4 + 1] = frame_data[src_idx + 1]; // G
+ row[x * 4 + 2] = frame_data[src_idx + 2]; // B
+ row[x * 4 + 3] = 255; // A
+ } else {
+ row[x * 4 + 0] = 0; // R
+ row[x * 4 + 1] = 0; // G
+ row[x * 4 + 2] = 0; // B
+ row[x * 4 + 3] = 255; // A
+ }
+ }
+
+ off = off.saturating_add(stride);
+ if off >= len { break; }
+ }
+
+ *data.chunk_mut().size_mut() = (w * h * 4) as u32;
+ *data.chunk_mut().stride_mut() = user_data.stride;
+ } else {
+ // Frame data too small, fill with black
+ for i in 0..len {
+ mem[i] = 0;
+ }
+ *data.chunk_mut().size_mut() = len as u32;
+ *data.chunk_mut().stride_mut() = user_data.stride;
+ }
+ } else {
+ // Handle MJPEG format - copy JPEG data directly
+ if frame_data.len() <= len {
+ mem[..frame_data.len()].copy_from_slice(&frame_data);
+ *data.chunk_mut().size_mut() = frame_data.len() as u32;
+ trace!("Copied MJPEG frame: {} bytes", frame_data.len());
+ } else {
+ // Frame too large for buffer, truncate
+ mem[..len].copy_from_slice(&frame_data[..len]);
+ *data.chunk_mut().size_mut() = len as u32;
+ warn!("MJPEG frame truncated: {} -> {} bytes", frame_data.len(), len);
+ }
+ }
+ }
+ }
+ } else {
+ // No frame data available, generate black frame as fallback
+ trace!("No UPP frame data available, generating black frame");
+
+ for data in buffer.datas_mut() {
+ if let Some(mem) = data.data() {
+ let len = mem.len();
+
+ if !user_data.is_mjpeg {
+ // Fill with black for raw formats
+ for i in 0..len {
+ mem[i] = 0;
+ }
+
+ let w = config_width as usize;
+ let h = config_height as usize;
+ *data.chunk_mut().size_mut() = (w * h * 4) as u32;
+ *data.chunk_mut().stride_mut() = user_data.stride;
+ } else {
+ // Generate minimal valid 1x1 black JPEG for MJPEG format
+ // This is a minimal valid JPEG that represents a 1x1 black pixel
+ let minimal_jpeg: [u8; 143] = [
+ 0xFF, 0xD8, 0xFF, 0xE0, 0x00, 0x10, 0x4A, 0x46, 0x49, 0x46, 0x00, 0x01,
+ 0x01, 0x01, 0x00, 0x48, 0x00, 0x48, 0x00, 0x00, 0xFF, 0xDB, 0x00, 0x43,
+ 0x00, 0x08, 0x06, 0x06, 0x07, 0x06, 0x05, 0x08, 0x07, 0x07, 0x07, 0x09,
+ 0x09, 0x08, 0x0A, 0x0C, 0x14, 0x0D, 0x0C, 0x0B, 0x0B, 0x0C, 0x19, 0x12,
+ 0x13, 0x0F, 0x14, 0x1D, 0x1A, 0x1F, 0x1E, 0x1D, 0x1A, 0x1C, 0x1C, 0x20,
+ 0x24, 0x2E, 0x27, 0x20, 0x22, 0x2C, 0x23, 0x1C, 0x1C, 0x28, 0x37, 0x29,
+ 0x2C, 0x30, 0x31, 0x34, 0x34, 0x34, 0x1F, 0x27, 0x39, 0x3D, 0x38, 0x32,
+ 0x3C, 0x2E, 0x33, 0x34, 0x32, 0xFF, 0xC0, 0x00, 0x11, 0x08, 0x00, 0x01,
+ 0x00, 0x01, 0x01, 0x01, 0x11, 0x00, 0x02, 0x11, 0x01, 0x03, 0x11, 0x01,
+ 0xFF, 0xC4, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+ 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0xFF, 0xDA,
+ 0x00, 0x08, 0x01, 0x01, 0x00, 0x00, 0x3F, 0x00, 0x37, 0xFF, 0xD9
+ ];
+
+ let copy_len = minimal_jpeg.len().min(len);
+ mem[..copy_len].copy_from_slice(&minimal_jpeg[..copy_len]);
+ *data.chunk_mut().size_mut() = copy_len as u32;
+ trace!("Generated minimal 1x1 black JPEG placeholder: {} bytes, chunk size set to {}", copy_len, copy_len);
+ }
+ }
+ }
+ }
+
+ // Debug: Log chunk sizes before queuing
+ for (i, data) in buffer.datas_mut().iter_mut().enumerate() {
+ let chunk = data.chunk_mut();
+ trace!("Buffer {} chunk {}: size={}, stride={}", i, i, chunk.size(), chunk.stride());
+ }
+
+ // Return buffer to stream by dropping it (this automatically queues it)
+ // The Buffer struct implements Drop which handles the queuing
+ drop(buffer);
+ })
+ .register()
+ {
+ Ok(l) => l,
+ Err(e) => {
+ error!("Failed to register stream listener: {}", e);
+ return;
+ }
+ };
+
+ // Connect as an output (we are a source). Use MAP_BUFFERS only, not DRIVER.
+ // Serialize EnumFormat pods to bytes and build &Pod slice
+ let obj_to_pod = |obj: Object| -> Vec<u8> {
+ let value = pod::Value::Object(obj);
+ PodSerializer::serialize(std::io::Cursor::new(Vec::new()), &value)
+ .unwrap()
+ .0
+ .into_inner()
+ };
+ let enum_bytes: Vec<Vec<u8>> = vec![obj_to_pod(enum_mjpeg)];
+ let mut enum_pods: Vec<&Pod> = enum_bytes.iter().map(|b| Pod::from_bytes(b).unwrap()).collect();
+
+ if let Err(e) = stream.connect(
+ Direction::Output,
+ None,
+ StreamFlags::MAP_BUFFERS,
+ &mut enum_pods[..],
+ ) {
+ error!("Failed to connect PipeWire stream: {}", e);
+ return;
+ }
+
+ info!("Stream connected successfully");
+ info!("Virtual camera node '{}' is connecting to PipeWire", config.node_name);
+ info!("Other applications can now attempt to negotiate formats");
+
+ // Wait for our node to appear in the registry and capture object.serial
+ let t0 = std::time::Instant::now();
+ while serial_slot.load(AtomicOrdering::SeqCst) == 0 && t0.elapsed() < std::time::Duration::from_millis(1500) {
+ mainloop.loop_().iterate(std::time::Duration::from_millis(10));
+ }
+ let serial_logged = serial_slot.load(AtomicOrdering::SeqCst);
+ if serial_logged != 0 {
+ info!("You can target this node with: target-object={} or target-object={}", serial_logged, config.node_name);
+ } else {
+ warn!("Node serial not observed yet in registry; it may appear a bit later.");
+ }
+
+ // Run main loop until told to stop
+ info!("Starting main loop iteration...");
+ let mut iteration_count = 0;
+ while running.load(Ordering::Relaxed) {
+ iteration_count += 1;
+ if iteration_count % 1000 == 0 {
+ info!("Main loop iteration: {}", iteration_count);
+ }
+
+ // Drive loop
+ let result = mainloop.loop_().iterate(std::time::Duration::from_millis(16));
+ if result < 0 {
+ error!("Main loop iteration failed with result: {}", result);
+ break;
+ }
+ }
+ info!("Main loop exited after {} iterations", iteration_count);
+ }
+
+ /// Start frame processing thread
+ fn start_frame_processor(&mut self) -> Result<()> {
+ let (tx, rx) = mpsc::channel();
+ self.stats_frame_sender = Some(tx); // Use separate sender for stats
+
+ let running = Arc::clone(&self.running);
+ let stats = Arc::clone(&self.stats);
+ let last_frame_time = Arc::clone(&self.last_frame_time);
+
+ thread::spawn(move || {
+ Self::frame_processing_loop(rx, running, stats, last_frame_time);
+ });
+
+ info!("Frame processing thread started");
+ Ok(())
+ }
+
+ /// Frame processing loop that runs in a separate thread
+ fn frame_processing_loop(
+ rx: Receiver<Vec<u8>>,
+ running: Arc<AtomicBool>,
+ stats: Arc<Mutex<VideoStats>>,
+ last_frame_time: Arc<Mutex<Instant>>,
+ ) {
+ while running.load(Ordering::Relaxed) {
+ match rx.recv_timeout(std::time::Duration::from_millis(100)) {
+ Ok(frame_data) => {
+ // Process frame and update statistics
+ Self::update_stats(&stats, &last_frame_time, frame_data.len());
+ trace!("Frame processed: {} bytes", frame_data.len());
+ }
+ Err(mpsc::RecvTimeoutError::Timeout) => {
+ continue;
+ }
+ Err(mpsc::RecvTimeoutError::Disconnected) => {
+ break;
+ }
+ }
+ }
+ }
+
+ /// Update statistics with proper FPS calculation
+ fn update_stats(
+ stats: &Arc<Mutex<VideoStats>>,
+ last_frame_time: &Arc<Mutex<Instant>>,
+ frame_size: usize,
+ ) {
+ let mut stats_guard = stats.lock().unwrap();
+ stats_guard.frames_pushed += 1;
+ stats_guard.total_bytes += frame_size as u64;
+ stats_guard.backend_type = super::VideoBackendType::PipeWire;
+ stats_guard.is_ready = true;
+
+ let now = Instant::now();
+ let mut last_time = last_frame_time.lock().unwrap();
+ let duration = now.duration_since(*last_time);
+ *last_time = now;
+
+ if duration.as_millis() > 0 {
+ stats_guard.fps = 1000.0 / duration.as_millis() as f64;
+ }
+ }
+
+ /// Get current node information for external tools
+ pub fn get_node_info(&self) -> Option<(u32, u32, String)> {
+ // This would need to be implemented with proper synchronization
+ // For now, return the config info
+ Some((
+ self.virtual_node_id.unwrap_or(0),
+ 0, // object.serial - would need to be stored from the stream
+ self.config.node_name.clone(),
+ ))
+ }
+
+ /// Get the object.serial for targeting (if available)
+ pub fn get_object_serial(&self) -> Option<u32> {
+ // This would need to be implemented with proper synchronization
+ // For now, return None - the serial is logged when discovered
+ None
+ }
+
+ /// Check if the virtual camera node is registered and discoverable
+ pub fn is_node_registered(&self) -> bool {
+ self.is_initialized && self.running.load(Ordering::Relaxed)
+ }
+}
+
+impl VideoBackendTrait for PipeWireBackend {
+ fn initialize(&mut self) -> Result<()> {
+ if self.is_initialized {
+ return Ok(());
+ }
+
+ info!("Initializing PipeWire backend with native library...");
+
+ if let Err(e) = self.check_pipewire_available() {
+ error!("PipeWire not available: {}", e);
+ return Err(VideoError::DeviceNotReady.into());
+ }
+
+ if let Err(e) = self.create_virtual_camera_node() {
+ error!("Failed to create virtual camera node: {}", e);
+ return Err(VideoError::DeviceNotReady.into());
+ }
+
+ if let Err(e) = self.start_frame_processor() {
+ error!("Failed to start frame processor: {}", e);
+ return Err(VideoError::DeviceNotReady.into());
+ }
+
+ self.is_initialized = true;
+ self.running.store(true, Ordering::Relaxed);
+ info!("PipeWire backend initialized successfully with native library");
+ // Remove premature logging - the actual node creation will be logged in the PipeWire thread
+ // when the node is actually available
+
+ Ok(())
+ }
+
+ fn push_frame(&self, frame_data: &[u8]) -> Result<()> {
+ if !self.running.load(Ordering::Relaxed) {
+ return Err(VideoError::DeviceNotReady.into());
+ }
+
+ if !self.is_initialized {
+ return Err(VideoError::DeviceNotReady.into());
+ }
+
+ trace!("Queueing frame for PipeWire: {} bytes", frame_data.len());
+
+ // Send to PipeWire thread
+ if let Some(sender) = &self.pw_frame_sender {
+ if let Err(e) = sender.send(frame_data.to_vec()) {
+ error!("Failed to queue frame to PipeWire: {}", e);
+ return Err(VideoError::DeviceNotReady.into());
+ }
+ debug!("Frame queued for PipeWire processing: {} bytes", frame_data.len());
+ } else {
+ error!("PipeWire frame sender not available");
+ return Err(VideoError::DeviceNotReady.into());
+ }
+
+ // Send to stats thread
+ if let Some(sender) = &self.stats_frame_sender {
+ if let Err(e) = sender.send(frame_data.to_vec()) {
+ error!("Failed to queue frame to stats: {}", e);
+ // Don't fail the entire operation for stats
+ }
+ }
+
+ trace!("Frame queued successfully");
+ Ok(())
+ }
+
+ fn get_stats(&self) -> VideoStats {
+ self.stats.lock().unwrap().clone()
+ }
+
+ fn is_ready(&self) -> bool {
+ self.is_initialized && self.running.load(Ordering::Relaxed)
+ }
+
+ fn shutdown(&mut self) -> Result<()> {
+ if !self.is_initialized {
+ return Ok(())
+ }
+
+ info!("Shutting down PipeWire backend...");
+
+ self.running.store(false, Ordering::Relaxed);
+
+ if let Some(handle) = self.pw_thread.take() {
+ if let Err(e) = handle.join() {
+ error!("Error joining PipeWire thread: {:?}", e);
+ }
+ }
+
+ self.virtual_node_id = None;
+ self.pw_frame_sender = None;
+ self.stats_frame_sender = None;
+
+ self.is_initialized = false;
+ info!("PipeWire backend shut down successfully");
+
+ Ok(())
+ }
+}
+
+impl Drop for PipeWireBackend {
+ fn drop(&mut self) {
+ self.running.store(false, Ordering::Relaxed);
+
+ if let Some(handle) = self.pw_thread.take() {
+ let _ = handle.join();
+ }
+
+ // Note: frame_senders will be dropped automatically
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_pipewire_backend_creation() {
+ let config = PipeWireConfig::default();
+ let backend = PipeWireBackend::new(config);
+
+ assert_eq!(backend.config.node_name, "geek-szitman-supercamera");
+ assert_eq!(backend.config.description, "Geek Szitman SuperCamera - High-quality virtual camera for streaming and recording");
+ assert_eq!(backend.config.media_class, "Video/Source");
+ assert_eq!(backend.config.format, VideoFormat::MJPEG);
+ assert_eq!(backend.config.width, 640);
+ assert_eq!(backend.config.height, 480);
+ assert_eq!(backend.config.framerate, 30);
+ }
+
+ #[test]
+ fn test_pipewire_backend_default_config() {
+ let config = PipeWireConfig::default();
+ let backend = PipeWireBackend::new(config);
+
+ assert!(!backend.is_initialized);
+ assert!(!backend.is_ready());
+ }
+}