diff --git a/src/stream/pipeline/fake_pipeline.rs b/src/stream/pipeline/fake_pipeline.rs index 8928a654..5805c69f 100644 --- a/src/stream/pipeline/fake_pipeline.rs +++ b/src/stream/pipeline/fake_pipeline.rs @@ -8,7 +8,8 @@ use crate::{ }; use super::{ - PipelineGstreamerInterface, PipelineState, PIPELINE_FILTER_NAME, PIPELINE_SINK_TEE_NAME, + PipelineGstreamerInterface, PipelineState, PIPELINE_FILTER_NAME, PIPELINE_RTP_TEE_NAME, + PIPELINE_VIDEO_TEE_NAME, }; use anyhow::{anyhow, Result}; @@ -57,7 +58,8 @@ impl FakePipeline { }; let filter_name = format!("{PIPELINE_FILTER_NAME}-{pipeline_id}"); - let sink_tee_name = format!("{PIPELINE_SINK_TEE_NAME}-{pipeline_id}"); + let video_tee_name = format!("{PIPELINE_VIDEO_TEE_NAME}-{pipeline_id}"); + let rtp_tee_name = format!("{PIPELINE_RTP_TEE_NAME}-{pipeline_id}"); // Fakes (videotestsrc) are only "video/x-raw" or "video/x-bayer", // and to be able to encode it, we need to define an available @@ -74,8 +76,9 @@ impl FakePipeline { " ! x264enc tune=zerolatency speed-preset=ultrafast bitrate=5000", " ! h264parse", " ! capsfilter name={filter_name} caps=video/x-h264,profile={profile},stream-format=avc,alignment=au,width={width},height={height},framerate={interval_denominator}/{interval_numerator}", + " ! tee name={video_tee_name} allow-not-linked=true", " ! rtph264pay aggregate-mode=zero-latency config-interval=10 pt=96", - " ! tee name={sink_tee_name} allow-not-linked=true" + " ! tee name={rtp_tee_name} allow-not-linked=true" ), pattern = pattern, profile = "constrained-baseline", @@ -84,7 +87,8 @@ impl FakePipeline { interval_denominator = configuration.frame_interval.denominator, interval_numerator = configuration.frame_interval.numerator, filter_name = filter_name, - sink_tee_name = sink_tee_name, + video_tee_name = video_tee_name, + rtp_tee_name = rtp_tee_name, ) } VideoEncodeType::Yuyv => { @@ -97,8 +101,9 @@ impl FakePipeline { " ! timeoverlay", " ! video/x-raw,format=I420", " ! capsfilter name={filter_name} caps=video/x-raw,format=I420,width={width},height={height},framerate={interval_denominator}/{interval_numerator}", + " ! tee name={video_tee_name} allow-not-linked=true", " ! rtpvrawpay pt=96", - " ! tee name={sink_tee_name} allow-not-linked=true", + " ! tee name={rtp_tee_name} allow-not-linked=true", ), pattern = pattern, width = configuration.width, @@ -106,7 +111,8 @@ impl FakePipeline { interval_denominator = configuration.frame_interval.denominator, interval_numerator = configuration.frame_interval.numerator, filter_name = filter_name, - sink_tee_name = sink_tee_name, + video_tee_name = video_tee_name, + rtp_tee_name = rtp_tee_name, ) } VideoEncodeType::Mjpg => { @@ -117,8 +123,9 @@ impl FakePipeline { " ! video/x-raw,format=I420", " ! jpegenc quality=85 idct-method=1", " ! capsfilter name={filter_name} caps=image/jpeg,width={width},height={height},framerate={interval_denominator}/{interval_numerator}", + " ! tee name={video_tee_name} allow-not-linked=true", " ! rtpjpegpay pt=96", - " ! tee name={sink_tee_name} allow-not-linked=true", + " ! tee name={rtp_tee_name} allow-not-linked=true", ), pattern = pattern, width = configuration.width, @@ -126,7 +133,8 @@ impl FakePipeline { interval_denominator = configuration.frame_interval.denominator, interval_numerator = configuration.frame_interval.numerator, filter_name = filter_name, - sink_tee_name = sink_tee_name, + video_tee_name = video_tee_name, + rtp_tee_name = rtp_tee_name, ) } unsupported => { diff --git a/src/stream/pipeline/mod.rs b/src/stream/pipeline/mod.rs index 56de85b6..8e790269 100644 --- a/src/stream/pipeline/mod.rs +++ b/src/stream/pipeline/mod.rs @@ -93,12 +93,14 @@ impl Pipeline { pub struct PipelineState { pub pipeline_id: uuid::Uuid, pub pipeline: gst::Pipeline, - pub sink_tee: gst::Element, + pub video_tee: Option, + pub rtp_tee: Option, pub sinks: HashMap, pub pipeline_runner: PipelineRunner, } -pub const PIPELINE_SINK_TEE_NAME: &str = "SinkTee"; +pub const PIPELINE_RTP_TEE_NAME: &str = "RTPTee"; +pub const PIPELINE_VIDEO_TEE_NAME: &str = "VideoTee"; pub const PIPELINE_FILTER_NAME: &str = "Filter"; impl PipelineState { @@ -119,9 +121,9 @@ impl PipelineState { } }?; - let sink_tee = pipeline - .by_name(&format!("{PIPELINE_SINK_TEE_NAME}-{pipeline_id}")) - .context(format!("no element named {PIPELINE_SINK_TEE_NAME:#?}"))?; + let video_tee = pipeline.by_name(&format!("{PIPELINE_VIDEO_TEE_NAME}-{pipeline_id}")); + + let rtp_tee = pipeline.by_name(&format!("{PIPELINE_RTP_TEE_NAME}-{pipeline_id}")); let pipeline_runner = PipelineRunner::try_new(&pipeline, pipeline_id, false)?; @@ -133,7 +135,8 @@ impl PipelineState { Ok(Self { pipeline_id: *pipeline_id, pipeline, - sink_tee, + video_tee, + rtp_tee, sinks: Default::default(), pipeline_runner, }) @@ -144,8 +147,18 @@ impl PipelineState { pub fn add_sink(&mut self, mut sink: Sink) -> Result<()> { let pipeline_id = &self.pipeline_id; - // Request a new src pad for the Tee - let tee_src_pad = self.sink_tee.request_pad_simple("src_%u").context(format!( + // Request a new src pad for the used Tee + // Note: Here we choose if the sink will receive a Video or RTP packages + let tee = match sink { + Sink::Image(_) => &self.video_tee, + Sink::Udp(_) | Sink::Rtsp(_) | Sink::WebRTC(_) => &self.rtp_tee, + }; + + let Some(tee) = tee else { + return Err(anyhow!("No Tee for this kind of Pipeline")); + }; + + let tee_src_pad = tee.request_pad_simple("src_%u").context(format!( "Failed requesting src pad for Tee of the pipeline {pipeline_id}" ))?; debug!("Got tee's src pad {:#?}", tee_src_pad.name()); @@ -165,9 +178,12 @@ impl PipelineState { } } - if let Err(error) = - wait_for_element_state(pipeline.downgrade(), gst::State::Playing, 100, 2) - { + if let Err(error) = wait_for_element_state( + gst::prelude::ObjectExt::downgrade(pipeline), + gst::State::Playing, + 100, + 2, + ) { let _ = pipeline.set_state(gst::State::Null); sink.unlink(pipeline, pipeline_id)?; return Err(anyhow!( @@ -176,21 +192,22 @@ impl PipelineState { } if let Sink::Rtsp(sink) = &sink { - let caps = &self - .sink_tee - .static_pad("sink") - .expect("No static sink pad found on capsfilter") - .current_caps() - .context("Failed to get caps from capsfilter sink pad")?; + if let Some(rtp_tee) = &self.rtp_tee { + let caps = &rtp_tee + .static_pad("sink") + .expect("No static sink pad found on capsfilter") + .current_caps() + .context("Failed to get caps from capsfilter sink pad")?; - debug!("caps: {:#?}", caps.to_string()); + debug!("caps: {:#?}", caps.to_string()); - // In case it exisits, try to remove it first, but skip the result - let _ = RTSPServer::stop_pipeline(&sink.path()); + // In case it exisits, try to remove it first, but skip the result + let _ = RTSPServer::stop_pipeline(&sink.path()); - RTSPServer::add_pipeline(&sink.path(), &sink.socket_path(), caps)?; + RTSPServer::add_pipeline(&sink.path(), &sink.socket_path(), caps)?; - RTSPServer::start_pipeline(&sink.path())?; + RTSPServer::start_pipeline(&sink.path())?; + } } // Skipping ImageSink syncronization because it goes to some wrong state, @@ -239,15 +256,13 @@ impl PipelineState { .iter() .any(|child| child.name().starts_with("rtspsrc")) { - let sink_name = format!("{PIPELINE_SINK_TEE_NAME}-{pipeline_id}"); - let tee = pipeline - .by_name(&sink_name) - .context(format!("no element named {sink_name:#?}"))?; - if tee.src_pads().is_empty() { - if let Err(error) = pipeline.set_state(gst::State::Null) { - return Err(anyhow!( - "Failed to change state of Pipeline {pipeline_id} to NULL. Reason: {error}" - )); + if let Some(rtp_tee) = &self.rtp_tee { + if rtp_tee.src_pads().is_empty() { + if let Err(error) = pipeline.set_state(gst::State::Null) { + return Err(anyhow!( + "Failed to change state of Pipeline {pipeline_id} to NULL. Reason: {error}" + )); + } } } } diff --git a/src/stream/pipeline/redirect_pipeline.rs b/src/stream/pipeline/redirect_pipeline.rs index 3ea53114..02746c17 100644 --- a/src/stream/pipeline/redirect_pipeline.rs +++ b/src/stream/pipeline/redirect_pipeline.rs @@ -3,7 +3,7 @@ use crate::{ video_stream::types::VideoAndStreamInformation, }; -use super::{PipelineGstreamerInterface, PipelineState, PIPELINE_SINK_TEE_NAME}; +use super::{PipelineGstreamerInterface, PipelineState, PIPELINE_RTP_TEE_NAME}; use anyhow::{anyhow, Context, Result}; @@ -57,7 +57,7 @@ impl RedirectPipeline { .first() .context("Failed to access the fisrt endpoint")?; - let sink_tee_name = format!("{PIPELINE_SINK_TEE_NAME}-{pipeline_id}"); + let sink_tee_name = format!("{PIPELINE_RTP_TEE_NAME}-{pipeline_id}"); let description = match url.scheme() { "rtsp" => { diff --git a/src/stream/pipeline/v4l_pipeline.rs b/src/stream/pipeline/v4l_pipeline.rs index 465279fe..760cef58 100644 --- a/src/stream/pipeline/v4l_pipeline.rs +++ b/src/stream/pipeline/v4l_pipeline.rs @@ -5,7 +5,8 @@ use crate::{ }; use super::{ - PipelineGstreamerInterface, PipelineState, PIPELINE_FILTER_NAME, PIPELINE_SINK_TEE_NAME, + PipelineGstreamerInterface, PipelineState, PIPELINE_FILTER_NAME, PIPELINE_RTP_TEE_NAME, + PIPELINE_VIDEO_TEE_NAME, }; use anyhow::{anyhow, Result}; @@ -48,7 +49,8 @@ impl V4lPipeline { let interval_numerator = configuration.frame_interval.numerator; let interval_denominator = configuration.frame_interval.denominator; let filter_name = format!("{PIPELINE_FILTER_NAME}-{pipeline_id}"); - let sink_tee_name = format!("{PIPELINE_SINK_TEE_NAME}-{pipeline_id}"); + let video_tee_name = format!("{PIPELINE_VIDEO_TEE_NAME}-{pipeline_id}"); + let rtp_tee_name = format!("{PIPELINE_RTP_TEE_NAME}-{pipeline_id}"); let description = match &configuration.encode { VideoEncodeType::H264 => { @@ -57,8 +59,9 @@ impl V4lPipeline { "v4l2src device={device} do-timestamp=false", " ! h264parse", // Here we need the parse to help the stream-format and alignment part, which is being fixed here because avc/au seems to reduce the CPU usage in the RTP payloading part. " ! capsfilter name={filter_name} caps=video/x-h264,stream-format=avc,alignment=au,width={width},height={height},framerate={interval_denominator}/{interval_numerator}", + " ! tee name={video_tee_name} allow-not-linked=true", " ! rtph264pay aggregate-mode=zero-latency config-interval=10 pt=96", - " ! tee name={sink_tee_name} allow-not-linked=true" + " ! tee name={rtp_tee_name} allow-not-linked=true" ), device = device, width = width, @@ -66,7 +69,8 @@ impl V4lPipeline { interval_denominator = interval_denominator, interval_numerator = interval_numerator, filter_name = filter_name, - sink_tee_name = sink_tee_name, + video_tee_name = video_tee_name, + rtp_tee_name = rtp_tee_name, ) } VideoEncodeType::Yuyv => { @@ -75,8 +79,9 @@ impl V4lPipeline { "v4l2src device={device} do-timestamp=false", " ! videoconvert", " ! capsfilter name={filter_name} caps=video/x-raw,format=I420,width={width},height={height},framerate={interval_denominator}/{interval_numerator}", + " ! tee name={video_tee_name} allow-not-linked=true", " ! rtpvrawpay pt=96", - " ! tee name={sink_tee_name} allow-not-linked=true" + " ! tee name={rtp_tee_name} allow-not-linked=true" ), device = device, width = width, @@ -84,7 +89,8 @@ impl V4lPipeline { interval_denominator = interval_denominator, interval_numerator = interval_numerator, filter_name = filter_name, - sink_tee_name = sink_tee_name + video_tee_name = video_tee_name, + rtp_tee_name = rtp_tee_name, ) } VideoEncodeType::Mjpg => { @@ -93,8 +99,9 @@ impl V4lPipeline { "v4l2src device={device} do-timestamp=false", // We don't need a jpegparse, as it leads to incompatible caps, spoiling the negotiation. " ! capsfilter name={filter_name} caps=image/jpeg,width={width},height={height},framerate={interval_denominator}/{interval_numerator}", + " ! tee name={video_tee_name} allow-not-linked=true", " ! rtpjpegpay pt=96", - " ! tee name={sink_tee_name} allow-not-linked=true" + " ! tee name={rtp_tee_name} allow-not-linked=true" ), device = device, width = width, @@ -102,7 +109,8 @@ impl V4lPipeline { interval_denominator = interval_denominator, interval_numerator = interval_numerator, filter_name = filter_name, - sink_tee_name = sink_tee_name + video_tee_name = video_tee_name, + rtp_tee_name = rtp_tee_name, ) } unsupported => { diff --git a/src/stream/sink/image_sink.rs b/src/stream/sink/image_sink.rs index 49ccc724..941f6403 100644 --- a/src/stream/sink/image_sink.rs +++ b/src/stream/sink/image_sink.rs @@ -332,8 +332,6 @@ impl ImageSink { let mut _transcoding_elements: Vec = Default::default(); match encoding { VideoEncodeType::H264 => { - let depayloader = gst::ElementFactory::make("rtph264depay").build()?; - let parser = gst::ElementFactory::make("h264parse").build()?; // For h264, we need to filter-out unwanted non-key frames here, before decoding it. let filter = gst::ElementFactory::make("identity") .property("drop-buffer-flags", gst::BufferFlags::DELTA_UNIT) @@ -343,24 +341,15 @@ impl ImageSink { .property_from_str("lowres", "2") // (0) is 'full'; (1) is '1/2-size'; (2) is '1/4-size' .build()?; decoder.has_property("discard-corrupted-frames", None).then(|| decoder.set_property("discard-corrupted-frames", true)); - _transcoding_elements.push(depayloader); - _transcoding_elements.push(parser); _transcoding_elements.push(filter); _transcoding_elements.push(decoder); } VideoEncodeType::Mjpg => { - let depayloader = gst::ElementFactory::make("rtpjpegdepay").build()?; - let parser = gst::ElementFactory::make("jpegparse").build()?; let decoder = gst::ElementFactory::make("jpegdec").build()?; decoder.has_property("discard-corrupted-frames", None).then(|| decoder.set_property("discard-corrupted-frames", true)); - _transcoding_elements.push(depayloader); - _transcoding_elements.push(parser); _transcoding_elements.push(decoder); } - VideoEncodeType::Yuyv => { - let depayloader = gst::ElementFactory::make("rtpvrawdepay").build()?; - _transcoding_elements.push(depayloader); - } + VideoEncodeType::Yuyv => {} _ => return Err(anyhow!("Unsupported video encoding for ImageSink: {encoding:?}. The supported are: H264, MJPG and YUYV")), };