Skip to content

Commit

Permalink
handle gstramer playing/pause state control
Browse files Browse the repository at this point in the history
deal with the video codec when h265 which fails to work.
prevent gstreamer from breaking with video codec changes.
  • Loading branch information
ltn-chriskennedy committed Aug 22, 2024
1 parent 73ad2e6 commit 6a8de8d
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 43 deletions.
122 changes: 85 additions & 37 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,10 @@ struct Args {
/// Dump Packets - Dump packets to the console in hex
#[clap(long, env = "DUMP_PACKETS", default_value_t = false)]
dump_packets: bool,

/// Clear Stream Timeout - Clear the streams that are not changing after this amout of time in ms
#[clap(long, env = "CLEAR_STREAM_TIMEOUT", default_value_t = 0)]
clear_stream_timeout: u64,
}

// MAIN Function
Expand Down Expand Up @@ -1607,6 +1611,7 @@ async fn rsprobe(running: Arc<AtomicBool>) {
return;
}
};
let mut gstreamer_playing = false;

// Start the pipeline
#[cfg(feature = "gst")]
Expand All @@ -1618,6 +1623,7 @@ async fn rsprobe(running: Arc<AtomicBool>) {
return;
}
}
gstreamer_playing = true;
}

// Spawn separate tasks for processing video packets and pulling images
Expand Down Expand Up @@ -1699,6 +1705,10 @@ async fn rsprobe(running: Arc<AtomicBool>) {

let mut last_kafka_send_time = Instant::now();

#[cfg(feature = "gst")]
let mut video_packet_errors = 0;
#[cfg(feature = "gst")]

loop {
match prx.try_recv() {
Ok((packet, timestamp, iat)) => {
Expand Down Expand Up @@ -1764,7 +1774,9 @@ async fn rsprobe(running: Arc<AtomicBool>) {
pmt_pid = Some(pid);
}
// Update PID_MAP with new stream types
//cleanup_stale_streams();
if args.clear_stream_timeout > 0 {
cleanup_stale_streams(args.clear_stream_timeout);
}
let program_number_result = update_pid_map(
&packet_chunk,
&pmt_info.packet,
Expand Down Expand Up @@ -1844,44 +1856,80 @@ async fn rsprobe(running: Arc<AtomicBool>) {
// Check if this is a video PID and if so parse NALS and decode video
// Process video packets
#[cfg(feature = "gst")]
if args.extract_images {
#[cfg(feature = "gst")]
let video_packet = Arc::new(
stream_data.packet[stream_data.packet_start
..stream_data.packet_start + stream_data.packet_len]
.to_vec(),
);

// Send the video packet to the processing task
if let Err(_) = video_packet_sender
.try_send(Arc::try_unwrap(video_packet).unwrap_or_default())
{
// If the channel is full, drop the packet
log::warn!("Video packet channel is full. Dropping packet.");
}
{
let video_codec_clone = video_codec.clone();
if Codec::H264 != video_codec_clone.clone().expect("Video Codec failed") && Codec::NONE != video_codec_clone.clone().expect("Video Codec failed") {
if gstreamer_playing == true {
error!("Probe: Codec {} is not H264, pausing gstramer", video_codec_clone.clone().expect("Video Codec failed").to_string());
match pipeline.set_state(gst::State::Paused) {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to set the pipeline state to Paused: {}", err);
return;
}
}
gstreamer_playing = false;
}
} else {
if gstreamer_playing == false {
info!("Probe: Starting Gstreamer pipeline for images");
match pipeline.set_state(gst::State::Playing) {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to set the pipeline state to Playing: {}", err);
return;
}
}
gstreamer_playing = true;
}
if args.extract_images && video_packet_errors <= 32 {
#[cfg(feature = "gst")]
let video_packet = Arc::new(
stream_data.packet[stream_data.packet_start
..stream_data.packet_start + stream_data.packet_len]
.to_vec(),
);

// Receive and process images
#[cfg(feature = "gst")]
if let Ok((image_data, pts, duplicates, hash, hamming)) =
image_receiver.try_recv()
{
log::info!(
"Probe: [{}] Received jpeg image of {} bytes repeated[{}] perceptual hash[{:016X}] hamming difference[{}].",
pts,
image_data.len(),
duplicates,
hash,
hamming,
);
// Send the video packet to the processing task
if let Err(_) = video_packet_sender
.try_send(Arc::try_unwrap(video_packet).unwrap_or_default())
{
// If the channel is full, drop the packet
log::warn!("Video packet channel is full. Dropping packet.");
video_packet_errors += 1;
if video_packet_errors > 32 {
error!("Probe: Video packet channel has {} errors, exiting.", video_packet_errors);
running.store(false, Ordering::SeqCst);
break;
}
} else {
video_packet_errors = 0;
}

// Process the received image data
images.push(ImageData {
image: image_data,
pts: pts,
duplicates: duplicates,
hash: hash,
hamming: hamming,
});
// Receive and process images
#[cfg(feature = "gst")]
if let Ok((image_data, pts, duplicates, hash, hamming)) =
image_receiver.try_recv()
{
log::info!(
"Probe: [{}] Received jpeg image of {} bytes repeated[{}] perceptual hash[{:016X}] hamming difference[{}].",
pts,
image_data.len(),
duplicates,
hash,
hamming,
);

// Process the received image data
images.push(ImageData {
image: image_data,
pts: pts,
duplicates: duplicates,
hash: hash,
hamming: hamming,
});
}
}
}
}

Expand Down
17 changes: 11 additions & 6 deletions src/stream_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ pub fn initialize_pipeline(
// Create a pipeline to extract video frames
let pipeline = match stream_type_number {
0x02 => create_pipeline(
&format!("appsrc name=src ! tsdemux |capsfiter caps=video/mpeg ! \
&format!("appsrc name=src ! tsdemux ! \
mpeg2dec ! videorate ! video/x-raw,framerate={} ! videoconvert ! video/x-raw,format=RGB {} ! \
appsink name=sink", framerate, scale_string),
)?,
Expand All @@ -118,7 +118,7 @@ pub fn initialize_pipeline(
videoconvert ! video/x-raw,format=RGB {} ! appsink name=sink", framerate, scale_string),
)?,
0x24 => create_pipeline(
&format!("appsrc name=src ! tsdemux ! capsfilter caps=video/x-h265 ! \
&format!("appsrc name=src ! tsdemux ! \
h265parse ! avdec_h265 ! videorate ! video/x-raw,framerate={} ! \
videoconvert ! video/x-raw,format=RGB {} ! appsink name=sink", framerate, scale_string),
)?,
Expand Down Expand Up @@ -159,6 +159,7 @@ pub fn process_video_packets(
running: Arc<AtomicBool>,
) {
tokio::spawn(async move {
let mut errors = 0;
while let Some(packet) = video_packet_receiver.recv().await {
if !running.load(Ordering::SeqCst) {
break;
Expand All @@ -168,6 +169,12 @@ pub fn process_video_packets(
// Push buffer only if not full
if let Err(err) = appsrc.push_buffer(buffer) {
log::warn!("Buffer full, dropping packet: {}", err);
errors += 1;
if errors > 100 {
break;
}
} else {
errors = 0;
}
}
});
Expand Down Expand Up @@ -1507,19 +1514,17 @@ pub fn process_packet(
}
}

pub fn cleanup_stale_streams() {
pub fn cleanup_stale_streams(clear_stream_timeout: u64) {
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;

let one_minute = 60 * 60000; // 1 minute in milliseconds

let mut pid_map = PID_MAP.write().unwrap();
let stale_pids: Vec<u16> = pid_map
.iter()
.filter_map(|(&pid, stream_data)| {
if current_time.saturating_sub(stream_data.last_arrival_time) > one_minute {
if current_time.saturating_sub(stream_data.last_arrival_time) > clear_stream_timeout{
Some(pid)
} else {
None
Expand Down

0 comments on commit 6a8de8d

Please sign in to comment.