1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
|
//! PipeWire backend for video streaming using native library
use super::{VideoBackendTrait, VideoFormat, VideoStats};
use crate::error::{Result, VideoError};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering, Ordering as AtomicOrdering};
use std::sync::Mutex;
use std::time::Instant;
use tracing::{debug, info, trace, error, warn};
use std::thread;
use std::sync::mpsc::{self, Sender, Receiver};
// PipeWire imports
use pipewire::{
main_loop::MainLoop,
context::Context,
stream::{Stream, StreamFlags, StreamState},
properties::properties,
keys,
spa::pod::{self, Pod, Object},
spa::utils::{Direction, SpaTypes, Fraction, Rectangle},
spa::param::ParamType,
spa::param::format::{FormatProperties, MediaSubtype, MediaType},
spa::pod::serialize::PodSerializer,
};
/// PipeWire backend implementation using native library
pub struct PipeWireBackend {
is_initialized: bool,
stats: Arc<Mutex<VideoStats>>,
config: PipeWireConfig,
running: Arc<AtomicBool>,
virtual_node_id: Option<u32>,
pw_frame_sender: Option<Sender<Vec<u8>>>, // Separate sender for PipeWire thread
stats_frame_sender: Option<Sender<Vec<u8>>>, // Separate sender for stats thread
last_frame_time: Arc<Mutex<Instant>>,
// PipeWire objects - these need to be in a separate thread-safe context
pw_thread: Option<thread::JoinHandle<()>>,
}
/// PipeWire configuration
#[derive(Debug, Clone)]
pub struct PipeWireConfig {
pub node_name: String,
pub description: String,
pub media_class: String,
pub format: VideoFormat,
pub width: u32,
pub height: u32,
pub framerate: u32,
}
impl Default for PipeWireConfig {
fn default() -> Self {
Self {
node_name: "geek-szitman-supercamera".to_string(),
description: "Geek Szitman SuperCamera - High-quality virtual camera for streaming and recording".to_string(),
media_class: "Video/Source".to_string(),
format: VideoFormat::MJPEG, // Changed back to MJPEG since that's what the camera provides
width: 640,
height: 480,
framerate: 30,
}
}
}
impl PipeWireBackend {
pub fn new(config: PipeWireConfig) -> Self {
Self {
is_initialized: false,
stats: Arc::new(Mutex::new(VideoStats::default())),
config,
running: Arc::new(AtomicBool::new(false)),
virtual_node_id: None,
pw_frame_sender: None,
stats_frame_sender: None,
last_frame_time: Arc::new(Mutex::new(Instant::now())),
pw_thread: None,
}
}
/// Check if PipeWire is available and running
fn check_pipewire_available(&self) -> Result<()> {
info!("Checking PipeWire availability...");
// This is a basic check - in a real implementation you might want to
// try to connect to the daemon to verify it's actually running
info!("PipeWire availability check passed");
Ok(())
}
/// Create a virtual camera node using native PipeWire API
fn create_virtual_camera_node(&mut self) -> Result<()> {
info!("Creating PipeWire virtual camera node using native API...");
info!("Node name: '{}'", self.config.node_name);
info!("Node description: '{}'", self.config.description);
// Start PipeWire processing in a separate thread to avoid Send/Sync issues
// The actual node creation and availability will be logged in the PipeWire thread
// Ensure the processing loop runs
self.running.store(true, Ordering::Relaxed);
let running = Arc::clone(&self.running);
let config = self.config.clone();
// Create channel for frame communication with PipeWire thread
let (frame_sender, frame_receiver) = mpsc::channel();
self.pw_frame_sender = Some(frame_sender);
info!("Starting PipeWire thread...");
let handle = thread::spawn(move || {
info!("PipeWire thread started, entering main loop...");
// Set panic hook to catch any panics in this thread
std::panic::set_hook(Box::new(|panic_info| {
error!("PipeWire thread panicked: {:?}", panic_info);
}));
Self::pipewire_main_loop(running, config, frame_receiver);
info!("PipeWire thread exiting...");
});
self.pw_thread = Some(handle);
self.virtual_node_id = Some(999); // Placeholder - will be updated when stream is ready
info!("Virtual camera node creation initiated in separate thread");
Ok(())
}
/// Main PipeWire loop that runs in a separate thread
fn pipewire_main_loop(running: Arc<AtomicBool>, config: PipeWireConfig, frame_receiver: Receiver<Vec<u8>>) {
info!("Starting PipeWire main loop in thread");
info!("Initializing PipeWire...");
pipewire::init();
info!("PipeWire initialized successfully");
// Create main loop with no properties
info!("Creating PipeWire main loop...");
let mainloop = match MainLoop::new(None) {
Ok(ml) => {
info!("Main loop created successfully");
ml
},
Err(e) => {
error!("Failed to create PipeWire main loop: {}", e);
error!("MainLoop::new error details: {:?}", e);
return;
}
};
// Create context
info!("Creating PipeWire context...");
let context = match Context::new(&mainloop) {
Ok(ctx) => {
info!("Context created successfully");
ctx
},
Err(e) => {
error!("Failed to create PipeWire context: {}", e);
return;
}
};
// Connect to PipeWire daemon
info!("Connecting to PipeWire daemon...");
let core = match context.connect(None) {
Ok(c) => {
info!("Connected to PipeWire daemon successfully");
c
},
Err(e) => {
error!("Failed to connect to PipeWire daemon: {}", e);
return;
}
};
info!("PipeWire connection established successfully");
// Set up registry listener to capture object.serial when our node appears
let serial_slot = Arc::new(AtomicU32::new(0));
let serial_slot_clone = Arc::clone(&serial_slot);
let wanted_name = config.node_name.clone();
let registry = core.get_registry().expect("get_registry");
let _reg_listener = registry
.add_listener_local()
.global(move |global_obj| {
if global_obj.type_ == pipewire::types::ObjectType::Node {
if let Some(props) = &global_obj.props {
if let Some(name) = props.get("node.name") {
if name == wanted_name {
if let Some(s) = props.get("object.serial") {
if let Ok(v) = s.parse::<u32>() {
serial_slot_clone.store(v, AtomicOrdering::SeqCst);
info!("Discovered our node in registry: node.name={} object.serial={}", name, v);
}
}
}
}
}
}
})
.register();
// User data for stream callbacks
#[derive(Debug)]
struct UserData {
is_mjpeg: bool,
frame_size: u32,
stride: i32,
current_frame: Arc<Mutex<Option<Vec<u8>>>>,
}
let current_frame = Arc::new(Mutex::new(None));
let current_frame_clone = Arc::clone(¤t_frame);
// Start frame receiver thread
let frame_receiver = Arc::new(Mutex::new(frame_receiver));
let frame_receiver_clone = Arc::clone(&frame_receiver);
let running_clone = Arc::clone(&running);
let _frame_thread = thread::spawn(move || {
while running_clone.load(Ordering::Relaxed) {
let frame_data = {
let receiver_guard = frame_receiver_clone.lock().unwrap();
match receiver_guard.recv_timeout(std::time::Duration::from_millis(16)) {
Ok(data) => Some(data),
Err(mpsc::RecvTimeoutError::Timeout) => None,
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
};
if let Some(frame_data) = frame_data {
let mut frame_guard = current_frame_clone.lock().unwrap();
*frame_guard = Some(frame_data);
trace!("Received new frame for PipeWire processing");
}
}
});
// Create a stream that will act as a video source
let stream = match Stream::new(
&core,
&config.node_name,
properties! {
// Essential keys for Video/Source classification
*keys::MEDIA_CLASS => "Video/Source",
*keys::NODE_NAME => config.node_name.as_str(),
*keys::APP_NAME => "geek-szitman-supercamera",
*keys::NODE_DESCRIPTION => config.description.as_str(),
// Additional metadata
"media.role" => "Camera",
"media.category" => "Capture",
// Optional cosmetics
"media.nick" => "SuperCamera",
"device.icon_name" => "camera-web",
// Prevent PipeWire from trying to drive the graph until someone connects
"node.passive" => "true",
},
) {
Ok(s) => s,
Err(e) => {
error!("Failed to create PipeWire stream: {}", e);
return;
}
};
// Build EnumFormat pod(s) - simplified to just MJPEG
let width_u = config.width as u32;
let height_u = config.height as u32;
let fps_u = config.framerate as u32;
// MJPEG: JPEG compressed - simplified format
let enum_mjpeg = pod::object!(
SpaTypes::ObjectParamFormat,
ParamType::EnumFormat,
pod::property!(FormatProperties::MediaType, Id, MediaType::Video),
pod::property!(FormatProperties::MediaSubtype, Id, MediaSubtype::Mjpg),
pod::property!(
FormatProperties::VideoSize,
Choice, Range, Rectangle,
Rectangle { width: width_u, height: height_u },
Rectangle { width: 16, height: 16 },
Rectangle { width: 4096, height: 4096 }
),
pod::property!(
FormatProperties::VideoFramerate,
Choice, Range, Fraction,
Fraction { num: fps_u, denom: 1 },
Fraction { num: 1, denom: 1 },
Fraction { num: 120, denom: 1 }
),
);
// Clone config values for closures
let config_width = config.width;
let config_height = config.height;
let config_framerate = config.framerate;
// Set up stream callbacks
let _listener = match stream
.add_local_listener_with_user_data(UserData {
is_mjpeg: false,
frame_size: 4 * 1024 * 1024, // safe cap
stride: 0,
current_frame: Arc::clone(¤t_frame),
})
.state_changed(move |stream, _user_data, old, new| {
info!("PipeWire stream state: {:?} -> {:?}", old, new);
if matches!(new, StreamState::Paused | StreamState::Streaming) {
info!("PipeWire node is ready and can be targeted by applications");
}
if new == StreamState::Paused {
if let Err(e) = stream.set_active(true) {
error!("Failed to activate PipeWire stream: {}", e);
} else {
info!("Activated stream scheduling");
}
}
if new == StreamState::Streaming {
info!("Stream is now streaming - virtual camera is active!");
}
})
.param_changed(move |stream, user_data, id, param| {
if let Some(param) = param {
info!("Param changed: id={:?}, type={:?}, raw_id={}", id, param.type_(), id);
// Handle format negotiation - simplified approach
if id == ParamType::Format.as_raw() || id == ParamType::EnumFormat.as_raw() || id == 15 {
info!("Format param received (id={}), setting up basic MJPEG format...", id);
// Set basic MJPEG parameters
user_data.is_mjpeg = true;
user_data.frame_size = 4 * 1024 * 1024; // 4MB safe cap for MJPEG
user_data.stride = 0; // MJPEG doesn't have stride
info!("Basic MJPEG format configured: {}x{} @ {} fps", config_width, config_height, config_framerate);
// Try to activate the stream directly
if let Err(e) = stream.set_active(true) {
error!("Failed to activate stream: {}", e);
} else {
info!("Stream activated successfully");
}
} else {
trace!("Stream param changed: id={} (ignored)", id);
}
} else {
trace!("Stream param changed: id={} (ignored)", id);
}
})
.process(move |stream, user_data| {
// Dequeue buffer
let Some(mut buffer) = stream.dequeue_buffer() else {
trace!("Out of buffers");
return;
};
// Get the current frame from UPP protocol
let frame_data = {
let frame_guard = user_data.current_frame.lock().unwrap();
frame_guard.clone()
};
if let Some(frame_data) = frame_data {
// Process actual camera frame data from UPP protocol
trace!("Processing UPP camera frame: {} bytes", frame_data.len());
for data in buffer.datas_mut() {
if let Some(mem) = data.data() {
let len = mem.len();
if !user_data.is_mjpeg {
// Handle raw formats (RGBx or I420)
let w = config_width as usize;
let h = config_height as usize;
let stride = user_data.stride as usize;
if frame_data.len() >= w * h * 3 {
// Convert RGB to RGBA
let mut off = 0usize;
for y in 0..h {
let row_end = (off + stride).min(len);
let row = &mut mem[off..row_end];
for x in 0..w.min(row.len()/4) {
let src_idx = (y * w + x) * 3;
if src_idx + 2 < frame_data.len() {
row[x * 4 + 0] = frame_data[src_idx + 0]; // R
row[x * 4 + 1] = frame_data[src_idx + 1]; // G
row[x * 4 + 2] = frame_data[src_idx + 2]; // B
row[x * 4 + 3] = 255; // A
} else {
row[x * 4 + 0] = 0; // R
row[x * 4 + 1] = 0; // G
row[x * 4 + 2] = 0; // B
row[x * 4 + 3] = 255; // A
}
}
off = off.saturating_add(stride);
if off >= len { break; }
}
*data.chunk_mut().size_mut() = (w * h * 4) as u32;
*data.chunk_mut().stride_mut() = user_data.stride;
} else {
// Frame data too small, fill with black
for i in 0..len {
mem[i] = 0;
}
*data.chunk_mut().size_mut() = len as u32;
*data.chunk_mut().stride_mut() = user_data.stride;
}
} else {
// Handle MJPEG format - copy JPEG data directly
if frame_data.len() <= len {
mem[..frame_data.len()].copy_from_slice(&frame_data);
*data.chunk_mut().size_mut() = frame_data.len() as u32;
trace!("Copied MJPEG frame: {} bytes", frame_data.len());
} else {
// Frame too large for buffer, truncate
mem[..len].copy_from_slice(&frame_data[..len]);
*data.chunk_mut().size_mut() = len as u32;
warn!("MJPEG frame truncated: {} -> {} bytes", frame_data.len(), len);
}
}
}
}
} else {
// No frame data available, generate black frame as fallback
trace!("No UPP frame data available, generating black frame");
for data in buffer.datas_mut() {
if let Some(mem) = data.data() {
let len = mem.len();
if !user_data.is_mjpeg {
// Fill with black for raw formats
for i in 0..len {
mem[i] = 0;
}
let w = config_width as usize;
let h = config_height as usize;
*data.chunk_mut().size_mut() = (w * h * 4) as u32;
*data.chunk_mut().stride_mut() = user_data.stride;
} else {
// Generate minimal valid 1x1 black JPEG for MJPEG format
// This is a minimal valid JPEG that represents a 1x1 black pixel
let minimal_jpeg: [u8; 143] = [
0xFF, 0xD8, 0xFF, 0xE0, 0x00, 0x10, 0x4A, 0x46, 0x49, 0x46, 0x00, 0x01,
0x01, 0x01, 0x00, 0x48, 0x00, 0x48, 0x00, 0x00, 0xFF, 0xDB, 0x00, 0x43,
0x00, 0x08, 0x06, 0x06, 0x07, 0x06, 0x05, 0x08, 0x07, 0x07, 0x07, 0x09,
0x09, 0x08, 0x0A, 0x0C, 0x14, 0x0D, 0x0C, 0x0B, 0x0B, 0x0C, 0x19, 0x12,
0x13, 0x0F, 0x14, 0x1D, 0x1A, 0x1F, 0x1E, 0x1D, 0x1A, 0x1C, 0x1C, 0x20,
0x24, 0x2E, 0x27, 0x20, 0x22, 0x2C, 0x23, 0x1C, 0x1C, 0x28, 0x37, 0x29,
0x2C, 0x30, 0x31, 0x34, 0x34, 0x34, 0x1F, 0x27, 0x39, 0x3D, 0x38, 0x32,
0x3C, 0x2E, 0x33, 0x34, 0x32, 0xFF, 0xC0, 0x00, 0x11, 0x08, 0x00, 0x01,
0x00, 0x01, 0x01, 0x01, 0x11, 0x00, 0x02, 0x11, 0x01, 0x03, 0x11, 0x01,
0xFF, 0xC4, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0xFF, 0xDA,
0x00, 0x08, 0x01, 0x01, 0x00, 0x00, 0x3F, 0x00, 0x37, 0xFF, 0xD9
];
let copy_len = minimal_jpeg.len().min(len);
mem[..copy_len].copy_from_slice(&minimal_jpeg[..copy_len]);
*data.chunk_mut().size_mut() = copy_len as u32;
trace!("Generated minimal 1x1 black JPEG placeholder: {} bytes, chunk size set to {}", copy_len, copy_len);
}
}
}
}
// Debug: Log chunk sizes before queuing
for (i, data) in buffer.datas_mut().iter_mut().enumerate() {
let chunk = data.chunk_mut();
trace!("Buffer {} chunk {}: size={}, stride={}", i, i, chunk.size(), chunk.stride());
}
// Return buffer to stream by dropping it (this automatically queues it)
// The Buffer struct implements Drop which handles the queuing
drop(buffer);
})
.register()
{
Ok(l) => l,
Err(e) => {
error!("Failed to register stream listener: {}", e);
return;
}
};
// Connect as an output (we are a source). Use MAP_BUFFERS only, not DRIVER.
// Serialize EnumFormat pods to bytes and build &Pod slice
let obj_to_pod = |obj: Object| -> Vec<u8> {
let value = pod::Value::Object(obj);
PodSerializer::serialize(std::io::Cursor::new(Vec::new()), &value)
.unwrap()
.0
.into_inner()
};
let enum_bytes: Vec<Vec<u8>> = vec![obj_to_pod(enum_mjpeg)];
let mut enum_pods: Vec<&Pod> = enum_bytes.iter().map(|b| Pod::from_bytes(b).unwrap()).collect();
if let Err(e) = stream.connect(
Direction::Output,
None,
StreamFlags::MAP_BUFFERS,
&mut enum_pods[..],
) {
error!("Failed to connect PipeWire stream: {}", e);
return;
}
info!("Stream connected successfully");
info!("Virtual camera node '{}' is connecting to PipeWire", config.node_name);
info!("Other applications can now attempt to negotiate formats");
// Wait for our node to appear in the registry and capture object.serial
let t0 = std::time::Instant::now();
while serial_slot.load(AtomicOrdering::SeqCst) == 0 && t0.elapsed() < std::time::Duration::from_millis(1500) {
mainloop.loop_().iterate(std::time::Duration::from_millis(10));
}
let serial_logged = serial_slot.load(AtomicOrdering::SeqCst);
if serial_logged != 0 {
info!("You can target this node with: target-object={} or target-object={}", serial_logged, config.node_name);
} else {
warn!("Node serial not observed yet in registry; it may appear a bit later.");
}
// Run main loop until told to stop
info!("Starting main loop iteration...");
let mut iteration_count = 0;
while running.load(Ordering::Relaxed) {
iteration_count += 1;
if iteration_count % 1000 == 0 {
info!("Main loop iteration: {}", iteration_count);
}
// Drive loop
let result = mainloop.loop_().iterate(std::time::Duration::from_millis(16));
if result < 0 {
error!("Main loop iteration failed with result: {}", result);
break;
}
}
info!("Main loop exited after {} iterations", iteration_count);
}
/// Start frame processing thread
fn start_frame_processor(&mut self) -> Result<()> {
let (tx, rx) = mpsc::channel();
self.stats_frame_sender = Some(tx); // Use separate sender for stats
let running = Arc::clone(&self.running);
let stats = Arc::clone(&self.stats);
let last_frame_time = Arc::clone(&self.last_frame_time);
thread::spawn(move || {
Self::frame_processing_loop(rx, running, stats, last_frame_time);
});
info!("Frame processing thread started");
Ok(())
}
/// Frame processing loop that runs in a separate thread
fn frame_processing_loop(
rx: Receiver<Vec<u8>>,
running: Arc<AtomicBool>,
stats: Arc<Mutex<VideoStats>>,
last_frame_time: Arc<Mutex<Instant>>,
) {
while running.load(Ordering::Relaxed) {
match rx.recv_timeout(std::time::Duration::from_millis(100)) {
Ok(frame_data) => {
// Process frame and update statistics
Self::update_stats(&stats, &last_frame_time, frame_data.len());
trace!("Frame processed: {} bytes", frame_data.len());
}
Err(mpsc::RecvTimeoutError::Timeout) => {
continue;
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
break;
}
}
}
}
/// Update statistics with proper FPS calculation
fn update_stats(
stats: &Arc<Mutex<VideoStats>>,
last_frame_time: &Arc<Mutex<Instant>>,
frame_size: usize,
) {
let mut stats_guard = stats.lock().unwrap();
stats_guard.frames_pushed += 1;
stats_guard.total_bytes += frame_size as u64;
stats_guard.backend_type = super::VideoBackendType::PipeWire;
stats_guard.is_ready = true;
let now = Instant::now();
let mut last_time = last_frame_time.lock().unwrap();
let duration = now.duration_since(*last_time);
*last_time = now;
if duration.as_millis() > 0 {
stats_guard.fps = 1000.0 / duration.as_millis() as f64;
}
}
/// Get current node information for external tools
pub fn get_node_info(&self) -> Option<(u32, u32, String)> {
// This would need to be implemented with proper synchronization
// For now, return the config info
Some((
self.virtual_node_id.unwrap_or(0),
0, // object.serial - would need to be stored from the stream
self.config.node_name.clone(),
))
}
/// Get the object.serial for targeting (if available)
pub fn get_object_serial(&self) -> Option<u32> {
// This would need to be implemented with proper synchronization
// For now, return None - the serial is logged when discovered
None
}
/// Check if the virtual camera node is registered and discoverable
pub fn is_node_registered(&self) -> bool {
self.is_initialized && self.running.load(Ordering::Relaxed)
}
}
impl VideoBackendTrait for PipeWireBackend {
fn initialize(&mut self) -> Result<()> {
if self.is_initialized {
return Ok(());
}
info!("Initializing PipeWire backend with native library...");
if let Err(e) = self.check_pipewire_available() {
error!("PipeWire not available: {}", e);
return Err(VideoError::DeviceNotReady.into());
}
if let Err(e) = self.create_virtual_camera_node() {
error!("Failed to create virtual camera node: {}", e);
return Err(VideoError::DeviceNotReady.into());
}
if let Err(e) = self.start_frame_processor() {
error!("Failed to start frame processor: {}", e);
return Err(VideoError::DeviceNotReady.into());
}
self.is_initialized = true;
self.running.store(true, Ordering::Relaxed);
info!("PipeWire backend initialized successfully with native library");
// Remove premature logging - the actual node creation will be logged in the PipeWire thread
// when the node is actually available
Ok(())
}
fn push_frame(&self, frame_data: &[u8]) -> Result<()> {
if !self.running.load(Ordering::Relaxed) {
return Err(VideoError::DeviceNotReady.into());
}
if !self.is_initialized {
return Err(VideoError::DeviceNotReady.into());
}
trace!("Queueing frame for PipeWire: {} bytes", frame_data.len());
// Send to PipeWire thread
if let Some(sender) = &self.pw_frame_sender {
if let Err(e) = sender.send(frame_data.to_vec()) {
error!("Failed to queue frame to PipeWire: {}", e);
return Err(VideoError::DeviceNotReady.into());
}
debug!("Frame queued for PipeWire processing: {} bytes", frame_data.len());
} else {
error!("PipeWire frame sender not available");
return Err(VideoError::DeviceNotReady.into());
}
// Send to stats thread
if let Some(sender) = &self.stats_frame_sender {
if let Err(e) = sender.send(frame_data.to_vec()) {
error!("Failed to queue frame to stats: {}", e);
// Don't fail the entire operation for stats
}
}
trace!("Frame queued successfully");
Ok(())
}
fn get_stats(&self) -> VideoStats {
self.stats.lock().unwrap().clone()
}
fn is_ready(&self) -> bool {
self.is_initialized && self.running.load(Ordering::Relaxed)
}
fn shutdown(&mut self) -> Result<()> {
if !self.is_initialized {
return Ok(())
}
info!("Shutting down PipeWire backend...");
self.running.store(false, Ordering::Relaxed);
if let Some(handle) = self.pw_thread.take() {
if let Err(e) = handle.join() {
error!("Error joining PipeWire thread: {:?}", e);
}
}
self.virtual_node_id = None;
self.pw_frame_sender = None;
self.stats_frame_sender = None;
self.is_initialized = false;
info!("PipeWire backend shut down successfully");
Ok(())
}
}
impl Drop for PipeWireBackend {
fn drop(&mut self) {
self.running.store(false, Ordering::Relaxed);
if let Some(handle) = self.pw_thread.take() {
let _ = handle.join();
}
// Note: frame_senders will be dropped automatically
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pipewire_backend_creation() {
let config = PipeWireConfig::default();
let backend = PipeWireBackend::new(config);
assert_eq!(backend.config.node_name, "geek-szitman-supercamera");
assert_eq!(backend.config.description, "Geek Szitman SuperCamera - High-quality virtual camera for streaming and recording");
assert_eq!(backend.config.media_class, "Video/Source");
assert_eq!(backend.config.format, VideoFormat::MJPEG);
assert_eq!(backend.config.width, 640);
assert_eq!(backend.config.height, 480);
assert_eq!(backend.config.framerate, 30);
}
#[test]
fn test_pipewire_backend_default_config() {
let config = PipeWireConfig::default();
let backend = PipeWireBackend::new(config);
assert!(!backend.is_initialized);
assert!(!backend.is_ready());
}
}
|