Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add video tee #331

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions src/stream/pipeline/fake_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use crate::{
};

use super::{
PipelineGstreamerInterface, PipelineState, PIPELINE_FILTER_NAME, PIPELINE_SINK_TEE_NAME,
PipelineGstreamerInterface, PipelineState, PIPELINE_FILTER_NAME, PIPELINE_RTP_TEE_NAME,
PIPELINE_VIDEO_TEE_NAME,
};

use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -57,7 +58,8 @@ impl FakePipeline {
};

let filter_name = format!("{PIPELINE_FILTER_NAME}-{pipeline_id}");
let sink_tee_name = format!("{PIPELINE_SINK_TEE_NAME}-{pipeline_id}");
let video_tee_name = format!("{PIPELINE_VIDEO_TEE_NAME}-{pipeline_id}");
let rtp_tee_name = format!("{PIPELINE_RTP_TEE_NAME}-{pipeline_id}");

// Fakes (videotestsrc) are only "video/x-raw" or "video/x-bayer",
// and to be able to encode it, we need to define an available
Expand All @@ -74,8 +76,9 @@ impl FakePipeline {
" ! x264enc tune=zerolatency speed-preset=ultrafast bitrate=5000",
" ! h264parse",
" ! capsfilter name={filter_name} caps=video/x-h264,profile={profile},stream-format=avc,alignment=au,width={width},height={height},framerate={interval_denominator}/{interval_numerator}",
" ! tee name={video_tee_name} allow-not-linked=true",
" ! rtph264pay aggregate-mode=zero-latency config-interval=10 pt=96",
" ! tee name={sink_tee_name} allow-not-linked=true"
" ! tee name={rtp_tee_name} allow-not-linked=true"
),
pattern = pattern,
profile = "constrained-baseline",
Expand All @@ -84,7 +87,8 @@ impl FakePipeline {
interval_denominator = configuration.frame_interval.denominator,
interval_numerator = configuration.frame_interval.numerator,
filter_name = filter_name,
sink_tee_name = sink_tee_name,
video_tee_name = video_tee_name,
rtp_tee_name = rtp_tee_name,
)
}
VideoEncodeType::Yuyv => {
Expand All @@ -97,16 +101,18 @@ impl FakePipeline {
" ! timeoverlay",
" ! video/x-raw,format=I420",
" ! capsfilter name={filter_name} caps=video/x-raw,format=I420,width={width},height={height},framerate={interval_denominator}/{interval_numerator}",
" ! tee name={video_tee_name} allow-not-linked=true",
" ! rtpvrawpay pt=96",
" ! tee name={sink_tee_name} allow-not-linked=true",
" ! tee name={rtp_tee_name} allow-not-linked=true",
),
pattern = pattern,
width = configuration.width,
height = configuration.height,
interval_denominator = configuration.frame_interval.denominator,
interval_numerator = configuration.frame_interval.numerator,
filter_name = filter_name,
sink_tee_name = sink_tee_name,
video_tee_name = video_tee_name,
rtp_tee_name = rtp_tee_name,
)
}
VideoEncodeType::Mjpg => {
Expand All @@ -117,16 +123,18 @@ impl FakePipeline {
" ! video/x-raw,format=I420",
" ! jpegenc quality=85 idct-method=1",
" ! capsfilter name={filter_name} caps=image/jpeg,width={width},height={height},framerate={interval_denominator}/{interval_numerator}",
" ! tee name={video_tee_name} allow-not-linked=true",
" ! rtpjpegpay pt=96",
" ! tee name={sink_tee_name} allow-not-linked=true",
" ! tee name={rtp_tee_name} allow-not-linked=true",
),
pattern = pattern,
width = configuration.width,
height = configuration.height,
interval_denominator = configuration.frame_interval.denominator,
interval_numerator = configuration.frame_interval.numerator,
filter_name = filter_name,
sink_tee_name = sink_tee_name,
video_tee_name = video_tee_name,
rtp_tee_name = rtp_tee_name,
)
}
unsupported => {
Expand Down
77 changes: 46 additions & 31 deletions src/stream/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,14 @@ impl Pipeline {
pub struct PipelineState {
pub pipeline_id: uuid::Uuid,
pub pipeline: gst::Pipeline,
pub sink_tee: gst::Element,
pub video_tee: Option<gst::Element>,
pub rtp_tee: Option<gst::Element>,
pub sinks: HashMap<uuid::Uuid, Sink>,
pub pipeline_runner: PipelineRunner,
}

pub const PIPELINE_SINK_TEE_NAME: &str = "SinkTee";
pub const PIPELINE_RTP_TEE_NAME: &str = "RTPTee";
pub const PIPELINE_VIDEO_TEE_NAME: &str = "VideoTee";
pub const PIPELINE_FILTER_NAME: &str = "Filter";

impl PipelineState {
Expand All @@ -119,9 +121,9 @@ impl PipelineState {
}
}?;

let sink_tee = pipeline
.by_name(&format!("{PIPELINE_SINK_TEE_NAME}-{pipeline_id}"))
.context(format!("no element named {PIPELINE_SINK_TEE_NAME:#?}"))?;
let video_tee = pipeline.by_name(&format!("{PIPELINE_VIDEO_TEE_NAME}-{pipeline_id}"));

let rtp_tee = pipeline.by_name(&format!("{PIPELINE_RTP_TEE_NAME}-{pipeline_id}"));

let pipeline_runner = PipelineRunner::try_new(&pipeline, pipeline_id, false)?;

Expand All @@ -133,7 +135,8 @@ impl PipelineState {
Ok(Self {
pipeline_id: *pipeline_id,
pipeline,
sink_tee,
video_tee,
rtp_tee,
sinks: Default::default(),
pipeline_runner,
})
Expand All @@ -144,8 +147,18 @@ impl PipelineState {
pub fn add_sink(&mut self, mut sink: Sink) -> Result<()> {
let pipeline_id = &self.pipeline_id;

// Request a new src pad for the Tee
let tee_src_pad = self.sink_tee.request_pad_simple("src_%u").context(format!(
// Request a new src pad for the used Tee
// Note: Here we choose if the sink will receive a Video or RTP packages
let tee = match sink {
Sink::Image(_) => &self.video_tee,
Sink::Udp(_) | Sink::Rtsp(_) | Sink::WebRTC(_) => &self.rtp_tee,
};

let Some(tee) = tee else {
return Err(anyhow!("No Tee for this kind of Pipeline"));
};

let tee_src_pad = tee.request_pad_simple("src_%u").context(format!(
"Failed requesting src pad for Tee of the pipeline {pipeline_id}"
))?;
debug!("Got tee's src pad {:#?}", tee_src_pad.name());
Expand All @@ -165,9 +178,12 @@ impl PipelineState {
}
}

if let Err(error) =
wait_for_element_state(pipeline.downgrade(), gst::State::Playing, 100, 2)
{
if let Err(error) = wait_for_element_state(
gst::prelude::ObjectExt::downgrade(pipeline),
gst::State::Playing,
100,
2,
) {
let _ = pipeline.set_state(gst::State::Null);
sink.unlink(pipeline, pipeline_id)?;
return Err(anyhow!(
Expand All @@ -176,21 +192,22 @@ impl PipelineState {
}

if let Sink::Rtsp(sink) = &sink {
let caps = &self
.sink_tee
.static_pad("sink")
.expect("No static sink pad found on capsfilter")
.current_caps()
.context("Failed to get caps from capsfilter sink pad")?;
if let Some(rtp_tee) = &self.rtp_tee {
let caps = &rtp_tee
.static_pad("sink")
.expect("No static sink pad found on capsfilter")
.current_caps()
.context("Failed to get caps from capsfilter sink pad")?;

debug!("caps: {:#?}", caps.to_string());
debug!("caps: {:#?}", caps.to_string());

// In case it exisits, try to remove it first, but skip the result
let _ = RTSPServer::stop_pipeline(&sink.path());
// In case it exisits, try to remove it first, but skip the result
let _ = RTSPServer::stop_pipeline(&sink.path());

RTSPServer::add_pipeline(&sink.path(), &sink.socket_path(), caps)?;
RTSPServer::add_pipeline(&sink.path(), &sink.socket_path(), caps)?;

RTSPServer::start_pipeline(&sink.path())?;
RTSPServer::start_pipeline(&sink.path())?;
}
}

// Skipping ImageSink syncronization because it goes to some wrong state,
Expand Down Expand Up @@ -239,15 +256,13 @@ impl PipelineState {
.iter()
.any(|child| child.name().starts_with("rtspsrc"))
{
let sink_name = format!("{PIPELINE_SINK_TEE_NAME}-{pipeline_id}");
let tee = pipeline
.by_name(&sink_name)
.context(format!("no element named {sink_name:#?}"))?;
if tee.src_pads().is_empty() {
if let Err(error) = pipeline.set_state(gst::State::Null) {
return Err(anyhow!(
"Failed to change state of Pipeline {pipeline_id} to NULL. Reason: {error}"
));
if let Some(rtp_tee) = &self.rtp_tee {
if rtp_tee.src_pads().is_empty() {
if let Err(error) = pipeline.set_state(gst::State::Null) {
return Err(anyhow!(
"Failed to change state of Pipeline {pipeline_id} to NULL. Reason: {error}"
));
}
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/stream/pipeline/redirect_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
video_stream::types::VideoAndStreamInformation,
};

use super::{PipelineGstreamerInterface, PipelineState, PIPELINE_SINK_TEE_NAME};
use super::{PipelineGstreamerInterface, PipelineState, PIPELINE_RTP_TEE_NAME};

use anyhow::{anyhow, Context, Result};

Expand Down Expand Up @@ -57,7 +57,7 @@ impl RedirectPipeline {
.first()
.context("Failed to access the fisrt endpoint")?;

let sink_tee_name = format!("{PIPELINE_SINK_TEE_NAME}-{pipeline_id}");
let sink_tee_name = format!("{PIPELINE_RTP_TEE_NAME}-{pipeline_id}");

let description = match url.scheme() {
"rtsp" => {
Expand Down
24 changes: 16 additions & 8 deletions src/stream/pipeline/v4l_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use crate::{
};

use super::{
PipelineGstreamerInterface, PipelineState, PIPELINE_FILTER_NAME, PIPELINE_SINK_TEE_NAME,
PipelineGstreamerInterface, PipelineState, PIPELINE_FILTER_NAME, PIPELINE_RTP_TEE_NAME,
PIPELINE_VIDEO_TEE_NAME,
};

use anyhow::{anyhow, Result};
Expand Down Expand Up @@ -48,7 +49,8 @@ impl V4lPipeline {
let interval_numerator = configuration.frame_interval.numerator;
let interval_denominator = configuration.frame_interval.denominator;
let filter_name = format!("{PIPELINE_FILTER_NAME}-{pipeline_id}");
let sink_tee_name = format!("{PIPELINE_SINK_TEE_NAME}-{pipeline_id}");
let video_tee_name = format!("{PIPELINE_VIDEO_TEE_NAME}-{pipeline_id}");
let rtp_tee_name = format!("{PIPELINE_RTP_TEE_NAME}-{pipeline_id}");

let description = match &configuration.encode {
VideoEncodeType::H264 => {
Expand All @@ -57,16 +59,18 @@ impl V4lPipeline {
"v4l2src device={device} do-timestamp=false",
" ! h264parse", // Here we need the parse to help the stream-format and alignment part, which is being fixed here because avc/au seems to reduce the CPU usage in the RTP payloading part.
" ! capsfilter name={filter_name} caps=video/x-h264,stream-format=avc,alignment=au,width={width},height={height},framerate={interval_denominator}/{interval_numerator}",
" ! tee name={video_tee_name} allow-not-linked=true",
" ! rtph264pay aggregate-mode=zero-latency config-interval=10 pt=96",
" ! tee name={sink_tee_name} allow-not-linked=true"
" ! tee name={rtp_tee_name} allow-not-linked=true"
),
device = device,
width = width,
height = height,
interval_denominator = interval_denominator,
interval_numerator = interval_numerator,
filter_name = filter_name,
sink_tee_name = sink_tee_name,
video_tee_name = video_tee_name,
rtp_tee_name = rtp_tee_name,
)
}
VideoEncodeType::Yuyv => {
Expand All @@ -75,16 +79,18 @@ impl V4lPipeline {
"v4l2src device={device} do-timestamp=false",
" ! videoconvert",
" ! capsfilter name={filter_name} caps=video/x-raw,format=I420,width={width},height={height},framerate={interval_denominator}/{interval_numerator}",
" ! tee name={video_tee_name} allow-not-linked=true",
" ! rtpvrawpay pt=96",
" ! tee name={sink_tee_name} allow-not-linked=true"
" ! tee name={rtp_tee_name} allow-not-linked=true"
),
device = device,
width = width,
height = height,
interval_denominator = interval_denominator,
interval_numerator = interval_numerator,
filter_name = filter_name,
sink_tee_name = sink_tee_name
video_tee_name = video_tee_name,
rtp_tee_name = rtp_tee_name,
)
}
VideoEncodeType::Mjpg => {
Expand All @@ -93,16 +99,18 @@ impl V4lPipeline {
"v4l2src device={device} do-timestamp=false",
// We don't need a jpegparse, as it leads to incompatible caps, spoiling the negotiation.
" ! capsfilter name={filter_name} caps=image/jpeg,width={width},height={height},framerate={interval_denominator}/{interval_numerator}",
" ! tee name={video_tee_name} allow-not-linked=true",
" ! rtpjpegpay pt=96",
" ! tee name={sink_tee_name} allow-not-linked=true"
" ! tee name={rtp_tee_name} allow-not-linked=true"
),
device = device,
width = width,
height = height,
interval_denominator = interval_denominator,
interval_numerator = interval_numerator,
filter_name = filter_name,
sink_tee_name = sink_tee_name
video_tee_name = video_tee_name,
rtp_tee_name = rtp_tee_name,
)
}
unsupported => {
Expand Down
13 changes: 1 addition & 12 deletions src/stream/sink/image_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,6 @@ impl ImageSink {
let mut _transcoding_elements: Vec<gst::Element> = Default::default();
match encoding {
VideoEncodeType::H264 => {
let depayloader = gst::ElementFactory::make("rtph264depay").build()?;
let parser = gst::ElementFactory::make("h264parse").build()?;
// For h264, we need to filter-out unwanted non-key frames here, before decoding it.
let filter = gst::ElementFactory::make("identity")
.property("drop-buffer-flags", gst::BufferFlags::DELTA_UNIT)
Expand All @@ -343,24 +341,15 @@ impl ImageSink {
.property_from_str("lowres", "2") // (0) is 'full'; (1) is '1/2-size'; (2) is '1/4-size'
.build()?;
decoder.has_property("discard-corrupted-frames", None).then(|| decoder.set_property("discard-corrupted-frames", true));
_transcoding_elements.push(depayloader);
_transcoding_elements.push(parser);
_transcoding_elements.push(filter);
_transcoding_elements.push(decoder);
}
VideoEncodeType::Mjpg => {
let depayloader = gst::ElementFactory::make("rtpjpegdepay").build()?;
let parser = gst::ElementFactory::make("jpegparse").build()?;
let decoder = gst::ElementFactory::make("jpegdec").build()?;
decoder.has_property("discard-corrupted-frames", None).then(|| decoder.set_property("discard-corrupted-frames", true));
_transcoding_elements.push(depayloader);
_transcoding_elements.push(parser);
_transcoding_elements.push(decoder);
}
VideoEncodeType::Yuyv => {
let depayloader = gst::ElementFactory::make("rtpvrawdepay").build()?;
_transcoding_elements.push(depayloader);
}
VideoEncodeType::Yuyv => {}
_ => return Err(anyhow!("Unsupported video encoding for ImageSink: {encoding:?}. The supported are: H264, MJPG and YUYV")),
};

Expand Down
Loading