diff --git a/pipeless/Cargo.lock b/pipeless/Cargo.lock index f115544..45239b4 100644 --- a/pipeless/Cargo.lock +++ b/pipeless/Cargo.lock @@ -1513,7 +1513,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pipeless-ai" -version = "1.9.0" +version = "1.10.0" dependencies = [ "clap", "ctrlc", diff --git a/pipeless/Cargo.toml b/pipeless/Cargo.toml index b710675..9db4cf8 100644 --- a/pipeless/Cargo.toml +++ b/pipeless/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pipeless-ai" -version = "1.9.0" +version = "1.10.0" edition = "2021" authors = ["Miguel A. Cabrera Minagorri"] description = "An open-source computer vision framework to build and deploy applications in minutes" diff --git a/pipeless/src/cli/start.rs b/pipeless/src/cli/start.rs index 54595a2..ac016aa 100644 --- a/pipeless/src/cli/start.rs +++ b/pipeless/src/cli/start.rs @@ -8,7 +8,7 @@ use ctrlc; use crate as pipeless; -pub fn start_pipeless_node(project_dir: &str, export_redis_events: bool) { +pub fn start_pipeless_node(project_dir: &str, export_redis_events: bool, stream_buffer_size: usize) { ctrlc::set_handler(|| { println!("Exiting..."); std::process::exit(0); @@ -48,7 +48,7 @@ pub fn start_pipeless_node(project_dir: &str, export_redis_events: bool) { let streams_table = Arc::new(RwLock::new(pipeless::config::streams::StreamsTable::new())); let dispatcher = pipeless::dispatcher::Dispatcher::new(streams_table.clone()); let dispatcher_sender = dispatcher.get_sender().clone(); - pipeless::dispatcher::start(dispatcher, frame_path_executor); + pipeless::dispatcher::start(dispatcher, frame_path_executor, stream_buffer_size); // Use the REST adapter to manage streams let rest_adapter = pipeless::config::adapters::rest::RestAdapter::new(streams_table.clone()); diff --git a/pipeless/src/dispatcher.rs b/pipeless/src/dispatcher.rs index e093840..3ce53dc 100644 --- a/pipeless/src/dispatcher.rs +++ b/pipeless/src/dispatcher.rs @@ -68,6 +68,7 @@ impl Dispatcher { pub fn start( dispatcher: Dispatcher, frame_path_executor_arc: Arc>, + buffer_size: usize, ) { let running_managers: Arc>> = Arc::new(RwLock::new(HashMap::new())); let frame_path_executor_arc = frame_path_executor_arc.clone(); @@ -139,7 +140,7 @@ pub fn start( match frame_path { Ok(frame_path) => { info!("New stream entry detected, creating pipeline"); - let new_pipeless_bus = pipeless::events::Bus::new(); + let new_pipeless_bus = pipeless::events::Bus::new(buffer_size); let new_manager_result = pipeless::pipeline::Manager::new( input_uri, output_uri, frame_path, &new_pipeless_bus.get_sender(), diff --git a/pipeless/src/events.rs b/pipeless/src/events.rs index 99e35f6..c70024f 100644 --- a/pipeless/src/events.rs +++ b/pipeless/src/events.rs @@ -1,6 +1,6 @@ use futures::{StreamExt, Future}; use gst::TagList; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use gstreamer as gst; use crate as pipeless; @@ -24,6 +24,7 @@ impl FrameChange { } impl EventType for FrameChange {} +#[derive(Clone)] pub struct TagsChange { tags: gst::TagList, } @@ -38,6 +39,7 @@ impl TagsChange { impl EventType for TagsChange {} // When the input stream stopped sending frames +#[derive(Clone)] pub struct EndOfInputStream {} impl EndOfInputStream { pub fn new() -> Self { @@ -47,6 +49,7 @@ impl EndOfInputStream { impl EventType for EndOfInputStream {} // When the output stream processed the input EOS +#[derive(Clone)] pub struct EndOfOutputStream {} impl EndOfOutputStream { pub fn new() -> Self { @@ -56,6 +59,7 @@ impl EndOfOutputStream { impl EventType for EndOfOutputStream {} // When the input stream caps are available +#[derive(Clone)] pub struct NewInputCaps { caps: String, } @@ -69,6 +73,7 @@ impl NewInputCaps { } impl EventType for NewInputCaps {} +#[derive(Clone)] pub struct InputStreamError { msg: String, } @@ -82,6 +87,7 @@ impl InputStreamError { } impl EventType for InputStreamError {} +#[derive(Clone)] pub struct OutputStreamError { msg: String, } @@ -134,6 +140,16 @@ impl Event { Self::OutputStreamErrorEvent(output_error) } } +// We need to clone the event in the ensure_send function loop +impl Clone for Event { + fn clone(&self) -> Self { + if let Event::FrameChangeEvent(_) = self { + panic!("Cloning FrameChangeEvent events is not allowed because they contain frames."); + } + + self.clone() + } +} /// The bus is used to handle events on the pipelines. /// working as expected even on different threads @@ -141,22 +157,22 @@ impl Event { // a cloud bus and a local bus. The cloud bus will basically // be a connection to a message broker. pub struct Bus { - sender: tokio::sync::mpsc::UnboundedSender, + sender: tokio::sync::mpsc::Sender, // Use a stream receiver to be able to process events concurrently - receiver: tokio_stream::wrappers::UnboundedReceiverStream, + receiver: tokio_stream::wrappers::ReceiverStream, } impl Bus { - pub fn new() -> Self { - let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::(); + pub fn new(buffer_size: usize) -> Self { + let (sender, receiver) = tokio::sync::mpsc::channel::(buffer_size); Self { sender, - receiver: tokio_stream::wrappers::UnboundedReceiverStream::new( + receiver: tokio_stream::wrappers::ReceiverStream::new( receiver ), } } - pub fn get_sender(&self) -> tokio::sync::mpsc::UnboundedSender { + pub fn get_sender(&self) -> tokio::sync::mpsc::Sender { self.sender.clone() } @@ -179,77 +195,81 @@ impl Bus { /* Utils to produce sync events. Can be called anywhere within sync code. We use them to publish events from Gstreamer pipeline callback. - -NOTE: We can use the send method in both, sync and async contexts, only -because the tokio unbounded channel never requires any form of waiting. -Before moving to Tokio channels, we were using the async_channels crate, -and we had to create different methods for sync and async code since -we cannot await in the Gstreamer callbacks */ + pub fn publish_new_frame_change_event_sync( - bus_sender: &tokio::sync::mpsc::UnboundedSender, + bus_sender: &tokio::sync::mpsc::Sender, frame: pipeless::data::Frame ) { let new_frame_event = Event::new_frame_change(frame); - if let Err(err) = bus_sender.send(new_frame_event) { - warn!("Error sending frame change event: {}", err); + // By using try_send frames are discarded when the channel is full + if let Err(err) = bus_sender.try_send(new_frame_event) { + debug!("Discarding frame: {}", err); } } pub fn publish_input_eos_event_sync( - bus_sender: &tokio::sync::mpsc::UnboundedSender, + bus_sender: &tokio::sync::mpsc::Sender, ) { let eos_event = Event::new_end_of_input_stream(); - if let Err(err) = bus_sender.send(eos_event) { + if let Err(err) = ensure_send(bus_sender, eos_event) { warn!("Error sending input EOS event: {}", err); } } pub fn publish_ouptut_eos_event_sync( - bus_sender: &tokio::sync::mpsc::UnboundedSender, + bus_sender: &tokio::sync::mpsc::Sender, ) { let eos_event = Event::new_end_of_output_stream(); - if let Err(err) = bus_sender.send(eos_event) { + if let Err(err) = ensure_send(bus_sender, eos_event) { warn!("Error sending output EOS event: {}", err); } } pub fn publish_input_tags_changed_event_sync( - bus_sender: &tokio::sync::mpsc::UnboundedSender, + bus_sender: &tokio::sync::mpsc::Sender, tags: gst::TagList ) { let tags_change_event = Event::new_tags_change(tags); - if let Err(err) = bus_sender.send(tags_change_event) { + if let Err(err) = ensure_send(bus_sender, tags_change_event) { warn!("Error sending tags change event: {}", err); } } pub fn publish_new_input_caps_event_sync( - bus_sender: &tokio::sync::mpsc::UnboundedSender, + bus_sender: &tokio::sync::mpsc::Sender, caps: String ) { let new_input_caps_event = Event::new_input_caps(caps); - if let Err(err) = bus_sender.send(new_input_caps_event) { + if let Err(err) = ensure_send(bus_sender, new_input_caps_event) { warn!("Error sending new input caps event: {}", err); } } pub fn publish_input_stream_error_event_sync( - bus_sender: &tokio::sync::mpsc::UnboundedSender, + bus_sender: &tokio::sync::mpsc::Sender, err: &str ) { let input_stream_error_event = Event::new_input_stream_error(err); - if let Err(err) = bus_sender.send(input_stream_error_event) { + if let Err(err) = ensure_send(bus_sender, input_stream_error_event) { warn!("Error sending input stream error event: {}", err); } } pub fn publish_output_stream_error_event_sync( - bus_sender: &tokio::sync::mpsc::UnboundedSender, + bus_sender: &tokio::sync::mpsc::Sender, err: &str ) { let output_stream_error_event = Event::new_output_stream_error(err); - if let Err(err) = bus_sender.send(output_stream_error_event) { + if let Err(err) = ensure_send(bus_sender, output_stream_error_event) { warn!("Error sending output stream error event: {}", err); } -} \ No newline at end of file +} + +fn ensure_send(tx: &tokio::sync::mpsc::Sender, event: Event) -> Result<(), String> { + // NOTE: this is not optimal, but we cannot await from the gstreamer code. Ideally we should use send() which will await until there is space on the channel + if let Err(err) = tokio::task::block_in_place(|| tx.blocking_send(event)) { + return Err(format!("Failed to send event: {}", err.to_string())); + } + Ok(()) +} diff --git a/pipeless/src/input/pipeline.rs b/pipeless/src/input/pipeline.rs index 72d9faf..4949b08 100644 --- a/pipeless/src/input/pipeline.rs +++ b/pipeless/src/input/pipeline.rs @@ -60,7 +60,7 @@ impl StreamDef { fn on_new_sample( pipeless_pipeline_id: uuid::Uuid, appsink: &gst_app::AppSink, - pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, + pipeless_bus_sender: &tokio::sync::mpsc::Sender, frame_number: &mut u64, ) -> Result { let sample = appsink.pull_sample().map_err(|_err| { @@ -135,7 +135,7 @@ fn on_new_sample( fn on_pad_added ( pad: &gst::Pad, _info: &mut gst::PadProbeInfo, - pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, + pipeless_bus_sender: &tokio::sync::mpsc::Sender, ) -> gst::PadProbeReturn { let caps = match pad.current_caps() { Some(c) => c, @@ -157,7 +157,7 @@ fn on_pad_added ( fn create_input_bin( uri: &str, - pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, + pipeless_bus_sender: &tokio::sync::mpsc::Sender, ) -> Result { let bin = gst::Bin::new(); if uri.starts_with("v4l2") { // Device webcam @@ -317,7 +317,7 @@ fn create_input_bin( fn on_bus_message( msg: &gst::Message, pipeline_id: uuid::Uuid, - pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, + pipeless_bus_sender: &tokio::sync::mpsc::Sender, ) { match msg.view() { gst::MessageView::Eos(eos) => { @@ -395,7 +395,7 @@ fn on_bus_message( fn create_gst_pipeline( pipeless_pipeline_id: uuid::Uuid, input_uri: &str, - pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, + pipeless_bus_sender: &tokio::sync::mpsc::Sender, ) -> Result { let pipeline = gst::Pipeline::new(); let input_bin = create_input_bin(input_uri, pipeless_bus_sender)?; @@ -445,7 +445,7 @@ impl Pipeline { pub fn new( id: uuid::Uuid, stream: pipeless::input::pipeline::StreamDef, - pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, + pipeless_bus_sender: &tokio::sync::mpsc::Sender, ) -> Result { let input_uri = stream.get_video().get_uri(); let gst_pipeline = create_gst_pipeline(id, input_uri, pipeless_bus_sender)?; diff --git a/pipeless/src/main.rs b/pipeless/src/main.rs index 4181505..b542240 100644 --- a/pipeless/src/main.rs +++ b/pipeless/src/main.rs @@ -88,9 +88,12 @@ enum Commands { /// Pipeless project directory #[clap(short, long, alias = "stages-dir")] project_dir: String, - /// Enable event export via Redis + /// Optional. Enable event export via Redis #[clap(short, long)] export_events_redis: bool, + /// Optional. Max buffer size for each stream, measured in number of frames. Serves as backpressure mechanism. When the buffer is full new frames are discarded until there is space again in the buffer. + #[clap(short, long, default_value = "240")] + stream_buffer_size: usize, }, /// Add resources such as streams Add { @@ -124,7 +127,7 @@ fn main() { match &cli.command { Some(Commands::Init { project_name , template}) => pipeless_ai::cli::init::init(&project_name, template), - Some(Commands::Start { project_dir , export_events_redis }) => pipeless_ai::cli::start::start_pipeless_node(&project_dir, *export_events_redis), + Some(Commands::Start { project_dir , export_events_redis , stream_buffer_size}) => pipeless_ai::cli::start::start_pipeless_node(&project_dir, *export_events_redis, *stream_buffer_size), Some(Commands::Add { command }) => { match &command { Some(AddCommand::Stream { input_uri, output_uri, frame_path , restart_policy}) => pipeless_ai::cli::streams::add(input_uri, output_uri, frame_path, restart_policy), diff --git a/pipeless/src/output/pipeline.rs b/pipeless/src/output/pipeline.rs index 3e25619..f1ba2c4 100644 --- a/pipeless/src/output/pipeline.rs +++ b/pipeless/src/output/pipeline.rs @@ -261,7 +261,7 @@ fn create_sink(stream: &StreamDef) -> Result { fn on_bus_message( msg: &gst::Message, pipeline_id: uuid::Uuid, - pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, + pipeless_bus_sender: &tokio::sync::mpsc::Sender, ) { match msg.view() { gst::MessageView::Eos(_eos) => { @@ -405,7 +405,7 @@ impl Pipeline { id: uuid::Uuid, stream: pipeless::output::pipeline::StreamDef, caps: &str, - pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, + pipeless_bus_sender: &tokio::sync::mpsc::Sender, ) -> Result { let (gst_pipeline, buffer_pool) = create_gst_pipeline(&stream, caps)?; let pipeline = Pipeline { @@ -458,7 +458,7 @@ impl Pipeline { pub fn on_new_frame( &self, frame: pipeless::data::Frame, - pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender + pipeless_bus_sender: &tokio::sync::mpsc::Sender ) -> Result<(), OutputPipelineError>{ match frame { pipeless::data::Frame::RgbFrame(mut rgb_frame) => { diff --git a/pipeless/src/pipeline.rs b/pipeless/src/pipeline.rs index cfecb3a..91a7f77 100644 --- a/pipeless/src/pipeline.rs +++ b/pipeless/src/pipeline.rs @@ -63,7 +63,7 @@ struct Pipeline { } impl Pipeline { fn new( - pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, + pipeless_bus_sender: &tokio::sync::mpsc::Sender, input_uri: String, output_uri: Option, frames_path: pipeless::stages::path::FramePath, @@ -100,7 +100,7 @@ impl Pipeline { pub fn create_and_start_output_pipeline( &mut self, input_caps: String, - pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, + pipeless_bus_sender: &tokio::sync::mpsc::Sender, ) -> Result<(), pipeless::output::pipeline::OutputPipelineError> { if let Some(stream_def) = &self.output_stream_def { // TODO: build streamdefs within pipelines and pass the uri only @@ -156,7 +156,7 @@ impl Manager { output_video_uri: Option, frames_path: pipeless::stages::path::FramePath, // The bus needs to be created before the pipeline - pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender, + pipeless_bus_sender: &tokio::sync::mpsc::Sender, dispatcher_sender: tokio::sync::mpsc::UnboundedSender, ) -> Result { let pipeline = Arc::new(RwLock::new(pipeless::pipeline::Pipeline::new(