Skip to content

Commit

Permalink
Merge pull request #133 from pipeless-ai/buffersize
Browse files Browse the repository at this point in the history
feat(core): Add backpressure mechanism
  • Loading branch information
miguelaeh authored Feb 20, 2024
2 parents 729b0be + 32382d1 commit 4d8bcea
Show file tree
Hide file tree
Showing 9 changed files with 72 additions and 48 deletions.
2 changes: 1 addition & 1 deletion pipeless/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pipeless/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pipeless-ai"
version = "1.9.0"
version = "1.10.0"
edition = "2021"
authors = ["Miguel A. Cabrera Minagorri"]
description = "An open-source computer vision framework to build and deploy applications in minutes"
Expand Down
4 changes: 2 additions & 2 deletions pipeless/src/cli/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use ctrlc;

use crate as pipeless;

pub fn start_pipeless_node(project_dir: &str, export_redis_events: bool) {
pub fn start_pipeless_node(project_dir: &str, export_redis_events: bool, stream_buffer_size: usize) {
ctrlc::set_handler(|| {
println!("Exiting...");
std::process::exit(0);
Expand Down Expand Up @@ -48,7 +48,7 @@ pub fn start_pipeless_node(project_dir: &str, export_redis_events: bool) {
let streams_table = Arc::new(RwLock::new(pipeless::config::streams::StreamsTable::new()));
let dispatcher = pipeless::dispatcher::Dispatcher::new(streams_table.clone());
let dispatcher_sender = dispatcher.get_sender().clone();
pipeless::dispatcher::start(dispatcher, frame_path_executor);
pipeless::dispatcher::start(dispatcher, frame_path_executor, stream_buffer_size);

// Use the REST adapter to manage streams
let rest_adapter = pipeless::config::adapters::rest::RestAdapter::new(streams_table.clone());
Expand Down
3 changes: 2 additions & 1 deletion pipeless/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl Dispatcher {
pub fn start(
dispatcher: Dispatcher,
frame_path_executor_arc: Arc<RwLock<pipeless::stages::path::FramePathExecutor>>,
buffer_size: usize,
) {
let running_managers: Arc<RwLock<HashMap<uuid::Uuid, pipeless::pipeline::Manager>>> = Arc::new(RwLock::new(HashMap::new()));
let frame_path_executor_arc = frame_path_executor_arc.clone();
Expand Down Expand Up @@ -139,7 +140,7 @@ pub fn start(
match frame_path {
Ok(frame_path) => {
info!("New stream entry detected, creating pipeline");
let new_pipeless_bus = pipeless::events::Bus::new();
let new_pipeless_bus = pipeless::events::Bus::new(buffer_size);
let new_manager_result = pipeless::pipeline::Manager::new(
input_uri, output_uri, frame_path,
&new_pipeless_bus.get_sender(),
Expand Down
78 changes: 49 additions & 29 deletions pipeless/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use futures::{StreamExt, Future};
use gst::TagList;
use log::{error, info, warn};
use log::{debug, error, info, warn};
use gstreamer as gst;

use crate as pipeless;
Expand All @@ -24,6 +24,7 @@ impl FrameChange {
}
impl EventType for FrameChange {}

#[derive(Clone)]
pub struct TagsChange {
tags: gst::TagList,
}
Expand All @@ -38,6 +39,7 @@ impl TagsChange {
impl EventType for TagsChange {}

// When the input stream stopped sending frames
#[derive(Clone)]
pub struct EndOfInputStream {}
impl EndOfInputStream {
pub fn new() -> Self {
Expand All @@ -47,6 +49,7 @@ impl EndOfInputStream {
impl EventType for EndOfInputStream {}

// When the output stream processed the input EOS
#[derive(Clone)]
pub struct EndOfOutputStream {}
impl EndOfOutputStream {
pub fn new() -> Self {
Expand All @@ -56,6 +59,7 @@ impl EndOfOutputStream {
impl EventType for EndOfOutputStream {}

// When the input stream caps are available
#[derive(Clone)]
pub struct NewInputCaps {
caps: String,
}
Expand All @@ -69,6 +73,7 @@ impl NewInputCaps {
}
impl EventType for NewInputCaps {}

#[derive(Clone)]
pub struct InputStreamError {
msg: String,
}
Expand All @@ -82,6 +87,7 @@ impl InputStreamError {
}
impl EventType for InputStreamError {}

#[derive(Clone)]
pub struct OutputStreamError {
msg: String,
}
Expand Down Expand Up @@ -134,29 +140,39 @@ impl Event {
Self::OutputStreamErrorEvent(output_error)
}
}
// We need to clone the event in the ensure_send function loop
impl Clone for Event {
fn clone(&self) -> Self {
if let Event::FrameChangeEvent(_) = self {
panic!("Cloning FrameChangeEvent events is not allowed because they contain frames.");
}

self.clone()
}
}

/// The bus is used to handle events on the pipelines.
/// working as expected even on different threads
// TODO: we should implement two kind of buses,
// a cloud bus and a local bus. The cloud bus will basically
// be a connection to a message broker.
pub struct Bus {
sender: tokio::sync::mpsc::UnboundedSender<Event>,
sender: tokio::sync::mpsc::Sender<Event>,
// Use a stream receiver to be able to process events concurrently
receiver: tokio_stream::wrappers::UnboundedReceiverStream<Event>,
receiver: tokio_stream::wrappers::ReceiverStream<Event>,
}
impl Bus {
pub fn new() -> Self {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<Event>();
pub fn new(buffer_size: usize) -> Self {
let (sender, receiver) = tokio::sync::mpsc::channel::<Event>(buffer_size);
Self {
sender,
receiver: tokio_stream::wrappers::UnboundedReceiverStream::new(
receiver: tokio_stream::wrappers::ReceiverStream::new(
receiver
),
}
}

pub fn get_sender(&self) -> tokio::sync::mpsc::UnboundedSender<Event> {
pub fn get_sender(&self) -> tokio::sync::mpsc::Sender<Event> {
self.sender.clone()
}

Expand All @@ -179,77 +195,81 @@ impl Bus {
/*
Utils to produce sync events. Can be called anywhere within sync code.
We use them to publish events from Gstreamer pipeline callback.
NOTE: We can use the send method in both, sync and async contexts, only
because the tokio unbounded channel never requires any form of waiting.
Before moving to Tokio channels, we were using the async_channels crate,
and we had to create different methods for sync and async code since
we cannot await in the Gstreamer callbacks
*/

pub fn publish_new_frame_change_event_sync(
bus_sender: &tokio::sync::mpsc::UnboundedSender<Event>,
bus_sender: &tokio::sync::mpsc::Sender<Event>,
frame: pipeless::data::Frame
) {
let new_frame_event = Event::new_frame_change(frame);
if let Err(err) = bus_sender.send(new_frame_event) {
warn!("Error sending frame change event: {}", err);
// By using try_send frames are discarded when the channel is full
if let Err(err) = bus_sender.try_send(new_frame_event) {
debug!("Discarding frame: {}", err);
}
}

pub fn publish_input_eos_event_sync(
bus_sender: &tokio::sync::mpsc::UnboundedSender<Event>,
bus_sender: &tokio::sync::mpsc::Sender<Event>,
) {
let eos_event = Event::new_end_of_input_stream();
if let Err(err) = bus_sender.send(eos_event) {
if let Err(err) = ensure_send(bus_sender, eos_event) {
warn!("Error sending input EOS event: {}", err);
}
}

pub fn publish_ouptut_eos_event_sync(
bus_sender: &tokio::sync::mpsc::UnboundedSender<Event>,
bus_sender: &tokio::sync::mpsc::Sender<Event>,
) {
let eos_event = Event::new_end_of_output_stream();
if let Err(err) = bus_sender.send(eos_event) {
if let Err(err) = ensure_send(bus_sender, eos_event) {
warn!("Error sending output EOS event: {}", err);
}
}

pub fn publish_input_tags_changed_event_sync(
bus_sender: &tokio::sync::mpsc::UnboundedSender<Event>,
bus_sender: &tokio::sync::mpsc::Sender<Event>,
tags: gst::TagList
) {
let tags_change_event = Event::new_tags_change(tags);
if let Err(err) = bus_sender.send(tags_change_event) {
if let Err(err) = ensure_send(bus_sender, tags_change_event) {
warn!("Error sending tags change event: {}", err);
}
}

pub fn publish_new_input_caps_event_sync(
bus_sender: &tokio::sync::mpsc::UnboundedSender<Event>,
bus_sender: &tokio::sync::mpsc::Sender<Event>,
caps: String
) {
let new_input_caps_event = Event::new_input_caps(caps);
if let Err(err) = bus_sender.send(new_input_caps_event) {
if let Err(err) = ensure_send(bus_sender, new_input_caps_event) {
warn!("Error sending new input caps event: {}", err);
}
}

pub fn publish_input_stream_error_event_sync(
bus_sender: &tokio::sync::mpsc::UnboundedSender<Event>,
bus_sender: &tokio::sync::mpsc::Sender<Event>,
err: &str
) {
let input_stream_error_event = Event::new_input_stream_error(err);
if let Err(err) = bus_sender.send(input_stream_error_event) {
if let Err(err) = ensure_send(bus_sender, input_stream_error_event) {
warn!("Error sending input stream error event: {}", err);
}
}

pub fn publish_output_stream_error_event_sync(
bus_sender: &tokio::sync::mpsc::UnboundedSender<Event>,
bus_sender: &tokio::sync::mpsc::Sender<Event>,
err: &str
) {
let output_stream_error_event = Event::new_output_stream_error(err);
if let Err(err) = bus_sender.send(output_stream_error_event) {
if let Err(err) = ensure_send(bus_sender, output_stream_error_event) {
warn!("Error sending output stream error event: {}", err);
}
}
}

fn ensure_send(tx: &tokio::sync::mpsc::Sender<Event>, event: Event) -> Result<(), String> {
// NOTE: this is not optimal, but we cannot await from the gstreamer code. Ideally we should use send() which will await until there is space on the channel
if let Err(err) = tokio::task::block_in_place(|| tx.blocking_send(event)) {
return Err(format!("Failed to send event: {}", err.to_string()));
}
Ok(())
}
12 changes: 6 additions & 6 deletions pipeless/src/input/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl StreamDef {
fn on_new_sample(
pipeless_pipeline_id: uuid::Uuid,
appsink: &gst_app::AppSink,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
pipeless_bus_sender: &tokio::sync::mpsc::Sender<pipeless::events::Event>,
frame_number: &mut u64,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let sample = appsink.pull_sample().map_err(|_err| {
Expand Down Expand Up @@ -135,7 +135,7 @@ fn on_new_sample(
fn on_pad_added (
pad: &gst::Pad,
_info: &mut gst::PadProbeInfo,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
pipeless_bus_sender: &tokio::sync::mpsc::Sender<pipeless::events::Event>,
) -> gst::PadProbeReturn {
let caps = match pad.current_caps() {
Some(c) => c,
Expand All @@ -157,7 +157,7 @@ fn on_pad_added (

fn create_input_bin(
uri: &str,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
pipeless_bus_sender: &tokio::sync::mpsc::Sender<pipeless::events::Event>,
) -> Result<gst::Bin, InputPipelineError> {
let bin = gst::Bin::new();
if uri.starts_with("v4l2") { // Device webcam
Expand Down Expand Up @@ -317,7 +317,7 @@ fn create_input_bin(
fn on_bus_message(
msg: &gst::Message,
pipeline_id: uuid::Uuid,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
pipeless_bus_sender: &tokio::sync::mpsc::Sender<pipeless::events::Event>,
) {
match msg.view() {
gst::MessageView::Eos(eos) => {
Expand Down Expand Up @@ -395,7 +395,7 @@ fn on_bus_message(
fn create_gst_pipeline(
pipeless_pipeline_id: uuid::Uuid,
input_uri: &str,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
pipeless_bus_sender: &tokio::sync::mpsc::Sender<pipeless::events::Event>,
) -> Result<gst::Pipeline, InputPipelineError> {
let pipeline = gst::Pipeline::new();
let input_bin = create_input_bin(input_uri, pipeless_bus_sender)?;
Expand Down Expand Up @@ -445,7 +445,7 @@ impl Pipeline {
pub fn new(
id: uuid::Uuid,
stream: pipeless::input::pipeline::StreamDef,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
pipeless_bus_sender: &tokio::sync::mpsc::Sender<pipeless::events::Event>,
) -> Result<Self, InputPipelineError> {
let input_uri = stream.get_video().get_uri();
let gst_pipeline = create_gst_pipeline(id, input_uri, pipeless_bus_sender)?;
Expand Down
7 changes: 5 additions & 2 deletions pipeless/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ enum Commands {
/// Pipeless project directory
#[clap(short, long, alias = "stages-dir")]
project_dir: String,
/// Enable event export via Redis
/// Optional. Enable event export via Redis
#[clap(short, long)]
export_events_redis: bool,
/// Optional. Max buffer size for each stream, measured in number of frames. Serves as backpressure mechanism. When the buffer is full new frames are discarded until there is space again in the buffer.
#[clap(short, long, default_value = "240")]
stream_buffer_size: usize,
},
/// Add resources such as streams
Add {
Expand Down Expand Up @@ -124,7 +127,7 @@ fn main() {

match &cli.command {
Some(Commands::Init { project_name , template}) => pipeless_ai::cli::init::init(&project_name, template),
Some(Commands::Start { project_dir , export_events_redis }) => pipeless_ai::cli::start::start_pipeless_node(&project_dir, *export_events_redis),
Some(Commands::Start { project_dir , export_events_redis , stream_buffer_size}) => pipeless_ai::cli::start::start_pipeless_node(&project_dir, *export_events_redis, *stream_buffer_size),
Some(Commands::Add { command }) => {
match &command {
Some(AddCommand::Stream { input_uri, output_uri, frame_path , restart_policy}) => pipeless_ai::cli::streams::add(input_uri, output_uri, frame_path, restart_policy),
Expand Down
6 changes: 3 additions & 3 deletions pipeless/src/output/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ fn create_sink(stream: &StreamDef) -> Result<gst::Element, BoolError> {
fn on_bus_message(
msg: &gst::Message,
pipeline_id: uuid::Uuid,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
pipeless_bus_sender: &tokio::sync::mpsc::Sender<pipeless::events::Event>,
) {
match msg.view() {
gst::MessageView::Eos(_eos) => {
Expand Down Expand Up @@ -405,7 +405,7 @@ impl Pipeline {
id: uuid::Uuid,
stream: pipeless::output::pipeline::StreamDef,
caps: &str,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
pipeless_bus_sender: &tokio::sync::mpsc::Sender<pipeless::events::Event>,
) -> Result<Self, OutputPipelineError> {
let (gst_pipeline, buffer_pool) = create_gst_pipeline(&stream, caps)?;
let pipeline = Pipeline {
Expand Down Expand Up @@ -458,7 +458,7 @@ impl Pipeline {
pub fn on_new_frame(
&self,
frame: pipeless::data::Frame,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>
pipeless_bus_sender: &tokio::sync::mpsc::Sender<pipeless::events::Event>
) -> Result<(), OutputPipelineError>{
match frame {
pipeless::data::Frame::RgbFrame(mut rgb_frame) => {
Expand Down
6 changes: 3 additions & 3 deletions pipeless/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ struct Pipeline {
}
impl Pipeline {
fn new(
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
pipeless_bus_sender: &tokio::sync::mpsc::Sender<pipeless::events::Event>,
input_uri: String,
output_uri: Option<String>,
frames_path: pipeless::stages::path::FramePath,
Expand Down Expand Up @@ -100,7 +100,7 @@ impl Pipeline {
pub fn create_and_start_output_pipeline(
&mut self,
input_caps: String,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
pipeless_bus_sender: &tokio::sync::mpsc::Sender<pipeless::events::Event>,
) -> Result<(), pipeless::output::pipeline::OutputPipelineError> {
if let Some(stream_def) = &self.output_stream_def {
// TODO: build streamdefs within pipelines and pass the uri only
Expand Down Expand Up @@ -156,7 +156,7 @@ impl Manager {
output_video_uri: Option<String>,
frames_path: pipeless::stages::path::FramePath,
// The bus needs to be created before the pipeline
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>,
pipeless_bus_sender: &tokio::sync::mpsc::Sender<pipeless::events::Event>,
dispatcher_sender: tokio::sync::mpsc::UnboundedSender<pipeless::dispatcher::DispatcherEvent>,
) -> Result<Self, PipelineError> {
let pipeline = Arc::new(RwLock::new(pipeless::pipeline::Pipeline::new(
Expand Down

0 comments on commit 4d8bcea

Please sign in to comment.