Skip to content

Commit

Permalink
force exit on bad input
Browse files Browse the repository at this point in the history
  • Loading branch information
ltn-chriskennedy committed Aug 28, 2024
1 parent 6146e8e commit 5d20744
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 54 deletions.
85 changes: 34 additions & 51 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,7 @@ async fn main() {
_ = ctrl_c => {
println!("\nCtrl-C received, shutting down");
running.store(false, Ordering::SeqCst);
std::process::exit(1);
}
_ = rsprobe(running.clone()) => {
println!("\nRsProbe exited");
Expand Down Expand Up @@ -1022,6 +1023,9 @@ async fn rsprobe(running: Arc<AtomicBool>) {
while running_kafka.load(Ordering::SeqCst) {
while let Some((mut batch, mut logs, mut images, pid_map, tr101290)) = krx.recv().await
{
if !running_kafka.load(Ordering::SeqCst) {
break;
}
debug!("Kafka received PID Map: {:#?}", pid_map);
debug!("Kafka TR101290 Errors: {:#?}", tr101290);

Expand Down Expand Up @@ -1711,10 +1715,17 @@ async fn rsprobe(running: Arc<AtomicBool>) {
let mut video_packet_errors = 0;
#[cfg(feature = "gst")]

let mut exit_now = false;
loop {
if !running.load(Ordering::SeqCst) {
break;
}
match prx.try_recv() {
Ok((packet, timestamp, iat)) => {
if args.packet_count > 0 && packets_captured > args.packet_count {
if !running.load(Ordering::SeqCst) {
break;
}
if exit_now || (args.packet_count > 0 && packets_captured > args.packet_count) {
println!(
"\nPacket count limit reached {}/{}, signaling termination...",
packets_captured, args.packet_count
Expand Down Expand Up @@ -1818,13 +1829,18 @@ async fn rsprobe(running: Arc<AtomicBool>) {
stream_data.capture_time,
stream_data.program_number,
new_codec,
video_codec.unwrap(),
video_codec.clone().unwrap(),
stream_data.stream_type_number,
stream_data.stream_type
);
video_codec = Some(new_codec);
// Reset video frame as the codec has changed
current_video_frame.clear();
if video_pid != Some(0xFFFF) {
// Force exit since codec has changed
exit_now = true;
} else {
video_codec = Some(new_codec);
// Reset video frame as the codec has changed
current_video_frame.clear();
}
}
}
}
Expand Down Expand Up @@ -1893,7 +1909,6 @@ async fn rsprobe(running: Arc<AtomicBool>) {
);

// Send the video packet to the processing task
let mut exit_now = false;
if let Err(_) = video_packet_sender
.try_send(Arc::try_unwrap(video_packet).unwrap_or_default())
{
Expand All @@ -1902,38 +1917,12 @@ async fn rsprobe(running: Arc<AtomicBool>) {
video_packet_errors += 1;
if video_packet_errors > 300 {
eprintln!("Probe: Video packet channel has {} errors, restarting Gstreamer.", video_packet_errors);
running.store(false, Ordering::SeqCst);
exit_now = true;

// Pause Gstreamer
if gstreamer_playing == true {
eprintln!("Probe: Too many errors with gstreamer {}, pausing video images", video_packet_errors);
match pipeline.set_state(gst::State::Paused) {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to set the pipeline state to Paused: {}", err);
running.store(false, Ordering::SeqCst);
exit_now = true;
}
}
match pipeline.set_state(gst::State::Playing) {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to set the pipeline state to Playing: {}", err);
running.store(false, Ordering::SeqCst);
exit_now = true;
}
}
}
}
} else {
video_packet_errors = 0;
}

if exit_now {
break;
}

// Receive and process images
#[cfg(feature = "gst")]
if let Ok((image_data, pts, duplicates, hash, hamming)) =
Expand Down Expand Up @@ -2041,6 +2030,11 @@ async fn rsprobe(running: Arc<AtomicBool>) {
break;
}
}

if exit_now == true {
running.store(false, Ordering::SeqCst);
break;
}
}

println!("\nSending stop signals to threads...");
Expand All @@ -2049,6 +2043,12 @@ async fn rsprobe(running: Arc<AtomicBool>) {
#[cfg(feature = "gst")]
if args.extract_images {
if gstreamer_playing == true {
match pipeline.set_state(gst::State::Paused) {
Ok(_) => (),
Err(err) => {
eprintln!("Failed to set the pipeline state to Paused: {}", err);
}
}
match pipeline.set_state(gst::State::Null) {
Ok(_) => (),
Err(err) => {
Expand All @@ -2058,23 +2058,6 @@ async fn rsprobe(running: Arc<AtomicBool>) {
}
}

println!("\nWaiting for threads to finish...");

// Send Kafka stop signal
if !kafka_broker_clone3.is_empty() && !kafka_topic_clone3.is_empty() {
let _ = ktx_clone2.try_send((
Vec::new(),
Vec::new(),
Vec::new(),
AHashMap::new(),
Tr101290Errors::new(),
));
drop(ktx_clone2);
// Wait for the kafka thread to finish
kafka_thread.await.unwrap();
}

capture_task.await.unwrap();

println!("\nThreads finished, exiting rsprobe");
println!("\nExiting RsCap...");
std::process::exit(1);
}
4 changes: 1 addition & 3 deletions src/stream_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,7 @@ pub fn process_video_packets(
if let Err(err) = appsrc.push_buffer(buffer) {
eprintln!("process_video_packets: Gstreamer Buffer full with {} errors, dropping packet: {}", errors, err);
errors += 1;
if errors > 1000 && errors < 1010 {
eprintln!("process_video_packerts: Too many errors in a row with gstreamer {}!", errors);
}
break;
} else {
errors = 0;
}
Expand Down

0 comments on commit 5d20744

Please sign in to comment.