diff --git a/pipeless/Cargo.lock b/pipeless/Cargo.lock index 2ea425d..cd92b73 100644 --- a/pipeless/Cargo.lock +++ b/pipeless/Cargo.lock @@ -74,6 +74,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "async-trait" +version = "0.1.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "atomic_refcell" version = "0.1.13" @@ -220,6 +231,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -1488,7 +1513,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pipeless-ai" -version = "1.6.3" +version = "1.7.0" dependencies = [ "clap", "ctrlc", @@ -1508,6 +1533,7 @@ dependencies = [ "ort", "pyo3", "rayon", + "redis", "reqwest", "serde", "serde_derive", @@ -1591,7 +1617,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.1", "pyo3-build-config", "pyo3-ffi", "pyo3-macros", @@ -1707,6 +1733,27 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redis" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd" +dependencies = [ + "async-trait", + "bytes", + "combine", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.4.10", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -1994,6 +2041,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "signal-hook" version = "0.3.17" diff --git a/pipeless/Cargo.toml b/pipeless/Cargo.toml index f3718ae..c92ee92 100644 --- a/pipeless/Cargo.toml +++ b/pipeless/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pipeless-ai" -version = "1.6.3" +version = "1.7.0" edition = "2021" authors = ["Miguel A. Cabrera Minagorri"] description = "An open-source computer vision framework to build and deploy applications in minutes" @@ -47,6 +47,7 @@ gstreamer-rtsp = "0.21.0" inquire = "0.6.2" tabled = "0.15.0" ctrlc = "3.4.2" +redis = { version = "0.24.0", features = ["aio", "tokio-comp"] } [dependencies.uuid] version = "1.4.1" diff --git a/pipeless/src/cli/start.rs b/pipeless/src/cli/start.rs index 87025fd..54595a2 100644 --- a/pipeless/src/cli/start.rs +++ b/pipeless/src/cli/start.rs @@ -1,5 +1,5 @@ use pyo3; -use std::sync::Arc; +use std::{env, sync::Arc}; use tokio::sync::RwLock; use gstreamer as gst; use glib; @@ -8,7 +8,7 @@ use ctrlc; use crate as pipeless; -pub fn start_pipeless_node(stages_dir: &str) { +pub fn start_pipeless_node(project_dir: &str, export_redis_events: bool) { ctrlc::set_handler(|| { println!("Exiting..."); std::process::exit(0); @@ -24,11 +24,27 @@ pub fn start_pipeless_node(stages_dir: &str) { // Initialize Gstreamer gst::init().expect("Unable to initialize gstreamer"); - let frame_path_executor = Arc::new(RwLock::new(pipeless::stages::path::FramePathExecutor::new(stages_dir))); + let frame_path_executor = Arc::new(RwLock::new(pipeless::stages::path::FramePathExecutor::new(project_dir))); // Init Tokio runtime let tokio_rt = tokio::runtime::Runtime::new().expect("Unable to create Tokio runtime"); tokio_rt.block_on(async { + // Create event exporter when enabled + let event_exporter = + if export_redis_events { + let redis_url = env::var("PIPELESS_REDIS_URL") + .expect("Please export the PIPELESS_REDIS_URL environment variable in order to export events to Redis"); + let redis_channel = env::var("PIPELESS_REDIS_CHANNEL") + .expect("Please export the PIPELESS_REDIS_CHANNEL environment variable in order to export events to Redis"); + pipeless::event_exporters::EventExporter::new_redis_exporter(&redis_url, &redis_channel).await + } else { + pipeless::event_exporters::EventExporter::new_none_exporter() + }; + { // Context to lock the global event exporter in order to set it + let mut e_exp = pipeless::event_exporters::EVENT_EXPORTER.lock().await; + *e_exp = event_exporter; + } + let streams_table = Arc::new(RwLock::new(pipeless::config::streams::StreamsTable::new())); let dispatcher = pipeless::dispatcher::Dispatcher::new(streams_table.clone()); let dispatcher_sender = dispatcher.get_sender().clone(); diff --git a/pipeless/src/dispatcher.rs b/pipeless/src/dispatcher.rs index b2291cf..e093840 100644 --- a/pipeless/src/dispatcher.rs +++ b/pipeless/src/dispatcher.rs @@ -27,7 +27,9 @@ pub struct Dispatcher { receiver: tokio_stream::wrappers::UnboundedReceiverStream, } impl Dispatcher { - pub fn new(streams_table: Arc>) -> Self { + pub fn new( + streams_table: Arc>, + ) -> Self { let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::(); Self { sender, @@ -36,7 +38,6 @@ impl Dispatcher { ), streams_table } - } pub fn get_sender(&self) -> tokio::sync::mpsc::UnboundedSender { @@ -66,7 +67,7 @@ impl Dispatcher { pub fn start( dispatcher: Dispatcher, - frame_path_executor_arc: Arc> + frame_path_executor_arc: Arc>, ) { let running_managers: Arc>> = Arc::new(RwLock::new(HashMap::new())); let frame_path_executor_arc = frame_path_executor_arc.clone(); @@ -152,6 +153,7 @@ pub fn start( new_manager.get_pipeline_id().await ) { error!("Error adding new stream to the streams config table: {}", err); + pipeless::event_exporters::events::export_stream_start_error_event(entry.get_id()).await; } let mut managers_map_guard = running_managers.write().await; managers_map_guard.insert(new_manager.get_pipeline_id().await, new_manager); @@ -160,6 +162,7 @@ pub fn start( error!("Unable to create new pipeline: {}. Rolling back streams configuration.", err.to_string()); let removed = streams_table_guard.remove(entry.get_id()); if removed.is_none() { warn!("Error rolling back table, entry not found.") }; + pipeless::event_exporters::events::export_stream_start_error_event(entry.get_id()).await; } } }, @@ -195,50 +198,59 @@ pub fn start( } } DispatcherEvent::PipelineFinished(pipeline_id, finish_state) => { - let mut table_write_guard = streams_table.write().await; - let stream_entry_option = table_write_guard.find_by_pipeline_id_mut(pipeline_id); - if let Some(entry) = stream_entry_option { - // Remove the pipeline from the stream entry since it finished - entry.unassign_pipeline(); - - // Update the target state of the stream based on the restart policy - match entry.get_restart_policy() { - pipeless::config::streams::RestartPolicy::Never => { - match finish_state { - pipeless::pipeline::PipelineEndReason::Completed => entry.set_target_state(pipeless::config::streams::StreamEntryState::Completed), - pipeless::pipeline::PipelineEndReason::Error => entry.set_target_state(pipeless::config::streams::StreamEntryState::Error), - pipeless::pipeline::PipelineEndReason::Updated => entry.set_target_state(pipeless::config::streams::StreamEntryState::Running), - } - }, - pipeless::config::streams::RestartPolicy::Always => { - entry.set_target_state(pipeless::config::streams::StreamEntryState::Running); - }, - pipeless::config::streams::RestartPolicy::OnError => { - if finish_state == pipeless::pipeline::PipelineEndReason::Error { - entry.set_target_state(pipeless::config::streams::StreamEntryState::Running); - } else { - entry.set_target_state(pipeless::config::streams::StreamEntryState::Error); - } - }, - pipeless::config::streams::RestartPolicy::OnEos => { - if finish_state == pipeless::pipeline::PipelineEndReason::Completed { + let mut stream_uuid: Option = None; + { // context to release the write lock + let mut table_write_guard = streams_table.write().await; + let stream_entry_option = table_write_guard.find_by_pipeline_id_mut(pipeline_id); + if let Some(entry) = stream_entry_option { + stream_uuid = Some(entry.get_id()); + // Remove the pipeline from the stream entry since it finished + entry.unassign_pipeline(); + + // Update the target state of the stream based on the restart policy + match entry.get_restart_policy() { + pipeless::config::streams::RestartPolicy::Never => { + match finish_state { + pipeless::pipeline::PipelineEndReason::Completed => entry.set_target_state(pipeless::config::streams::StreamEntryState::Completed), + pipeless::pipeline::PipelineEndReason::Error => entry.set_target_state(pipeless::config::streams::StreamEntryState::Error), + pipeless::pipeline::PipelineEndReason::Updated => entry.set_target_state(pipeless::config::streams::StreamEntryState::Running), + } + }, + pipeless::config::streams::RestartPolicy::Always => { entry.set_target_state(pipeless::config::streams::StreamEntryState::Running); - } else { - entry.set_target_state(pipeless::config::streams::StreamEntryState::Completed); - } - }, - } + }, + pipeless::config::streams::RestartPolicy::OnError => { + if finish_state == pipeless::pipeline::PipelineEndReason::Error { + entry.set_target_state(pipeless::config::streams::StreamEntryState::Running); + } else { + entry.set_target_state(pipeless::config::streams::StreamEntryState::Error); + } + }, + pipeless::config::streams::RestartPolicy::OnEos => { + if finish_state == pipeless::pipeline::PipelineEndReason::Completed { + entry.set_target_state(pipeless::config::streams::StreamEntryState::Running); + } else { + entry.set_target_state(pipeless::config::streams::StreamEntryState::Completed); + } + }, + } - // Create new event since we have modified the streams config table - if let Err(err) = dispatcher_sender.send(DispatcherEvent::TableChange) { - warn!("Unable to send dispatcher event for streams table changed. Error: {}", err.to_string()); + // Create new event since we have modified the streams config table + if let Err(err) = dispatcher_sender.send(DispatcherEvent::TableChange) { + warn!("Unable to send dispatcher event for streams table changed. Error: {}", err.to_string()); + } + } else { + warn!(" + Unable to unassign pipeline for stream. Stream entry not found. + Pipeline id: {} + ", pipeline_id); } - } else { - warn!(" - Unable to unassign pipeline for stream. Stream entry not found. - Pipeline id: {} - ", pipeline_id); } + + pipeless::event_exporters::events::export_stream_finished_event( + stream_uuid.unwrap_or_default(), + finish_state.to_string().as_str() + ).await; } } } diff --git a/pipeless/src/event_exporters/events.rs b/pipeless/src/event_exporters/events.rs new file mode 100644 index 0000000..ba51384 --- /dev/null +++ b/pipeless/src/event_exporters/events.rs @@ -0,0 +1,49 @@ +use std::fmt; +use log::warn; + +pub enum EventType { + StreamStartError, + StreamFinished, +} +impl fmt::Display for EventType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + EventType::StreamStartError => write!(f, "StreamStartError"), + EventType::StreamFinished => write!(f, "StreamFinished"), + } + } +} + +/* + * Exports a stream finished event to the external event exporter when it is enabled + */ +pub async fn export_stream_finished_event(stream_uuid: uuid::Uuid, stream_end_state: &str) { + let ext_event: serde_json::Value = serde_json::json!({ + "type": EventType::StreamFinished.to_string(), + "end_state": stream_end_state, + "stream_uuid": stream_uuid.to_string(), + }); + let ext_event_json_str = serde_json::to_string(&ext_event); + if let Ok(json_str) = ext_event_json_str { + super::EVENT_EXPORTER.lock().await.publish(&json_str).await; + } else { + warn!("Error serializing event to JSON string, skipping external publishing"); + } +} + +/* + * Exports a stream start error event to the external event exporter when it is enabled + */ +pub async fn export_stream_start_error_event(stream_uuid: uuid::Uuid) { + let ext_event: serde_json::Value = serde_json::json!({ + "type": EventType::StreamStartError.to_string(), + "end_state": "error", + "stream_uuid": stream_uuid.to_string(), + }); + let ext_event_json_str = serde_json::to_string(&ext_event); + if let Ok(json_str) = ext_event_json_str { + super::EVENT_EXPORTER.lock().await.publish(&json_str).await; + } else { + warn!("Error serializing event to JSON string, skipping external publishing"); + } +} diff --git a/pipeless/src/event_exporters/mod.rs b/pipeless/src/event_exporters/mod.rs new file mode 100644 index 0000000..09590f6 --- /dev/null +++ b/pipeless/src/event_exporters/mod.rs @@ -0,0 +1,66 @@ +use std::sync::Arc; + +use log::warn; +use redis::AsyncCommands; +use lazy_static::lazy_static; +use tokio::sync::Mutex; + +pub mod events; + +pub enum EventExporterEnum { + Redis(Redis), +} + +/* + * General event exporter that wraps many types of event exporters + * We cannot use the typical Box to create a common interface because trait methods cannot be async so we just create variants and invoke theit methods + */ +pub struct EventExporter { + exporter: Option, +} +impl EventExporter { + pub fn new_none_exporter() -> Self { + Self { exporter: None } + } + pub async fn new_redis_exporter(redis_url: &str, channel: &str) -> Self { + Self { + exporter: Some(EventExporterEnum::Redis(Redis::new(redis_url, channel).await)), + } + } + pub async fn publish(&mut self, message: &str) { + if let Some(exporter) = &mut self.exporter { + match exporter { + EventExporterEnum::Redis(pblsr) => pblsr.publish(message).await, + } + } + } +} + +/* + * Redis event exporter + */ +pub struct Redis { + connection: redis::aio::Connection, + channel: String, +} +impl Redis { + async fn new(redis_url: &str, channel: &str) -> Self { + let client = redis::Client::open(redis_url).expect("Unable to create Redis client with the provided URL, please check the value of the PIPELESS_REDIS_URL env var"); + let con = client.get_tokio_connection().await.expect("Failed to connect to Redis"); + + Self { connection: con, channel: channel.to_owned() } + } + + async fn publish(&mut self, message: &str) { + if let Err(err) = self.connection.publish::<&str, &str, i32>(&self.channel, message).await { + warn!("Error publishing message to Redis: {}", err.to_string()); + } + } +} + +// Create global variable to access the event exporter from any point of the code +// It uses an Arc to be shared among threads and a Mutex since the connection is updated on every push +lazy_static! { + pub static ref EVENT_EXPORTER: Arc> = + Arc::new(Mutex::new(EventExporter::new_none_exporter())); +} diff --git a/pipeless/src/lib.rs b/pipeless/src/lib.rs index eefeff1..a52c47c 100644 --- a/pipeless/src/lib.rs +++ b/pipeless/src/lib.rs @@ -20,3 +20,4 @@ pub mod dispatcher; pub mod stages; pub mod cli; pub mod kvs; +pub mod event_exporters; diff --git a/pipeless/src/main.rs b/pipeless/src/main.rs index 7074250..4181505 100644 --- a/pipeless/src/main.rs +++ b/pipeless/src/main.rs @@ -85,9 +85,12 @@ enum Commands { }, /// Start the pipeless node Start { - /// Read stages from the specified directory - #[arg(short, long)] - stages_dir: String, + /// Pipeless project directory + #[clap(short, long, alias = "stages-dir")] + project_dir: String, + /// Enable event export via Redis + #[clap(short, long)] + export_events_redis: bool, }, /// Add resources such as streams Add { @@ -121,7 +124,7 @@ fn main() { match &cli.command { Some(Commands::Init { project_name , template}) => pipeless_ai::cli::init::init(&project_name, template), - Some(Commands::Start { stages_dir }) => pipeless_ai::cli::start::start_pipeless_node(&stages_dir), + Some(Commands::Start { project_dir , export_events_redis }) => pipeless_ai::cli::start::start_pipeless_node(&project_dir, *export_events_redis), Some(Commands::Add { command }) => { match &command { Some(AddCommand::Stream { input_uri, output_uri, frame_path , restart_policy}) => pipeless_ai::cli::streams::add(input_uri, output_uri, frame_path, restart_policy), diff --git a/pipeless/src/pipeline.rs b/pipeless/src/pipeline.rs index 3ec427a..cfecb3a 100644 --- a/pipeless/src/pipeline.rs +++ b/pipeless/src/pipeline.rs @@ -1,7 +1,7 @@ use uuid; use tokio; use tokio::sync::RwLock; -use std::sync::Arc; +use std::{fmt, sync::Arc}; use log::{info, error, warn}; use crate as pipeless; @@ -31,12 +31,21 @@ impl From for PipelineError { } } -#[derive(Eq,PartialEq)] +#[derive(Debug,Eq,PartialEq)] pub enum PipelineEndReason { Completed, // End of stream Error, Updated, // When the user changes the stream definition } +impl fmt::Display for PipelineEndReason { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PipelineEndReason::Completed => write!(f, "completed"), + PipelineEndReason::Error => write!(f, "error"), + PipelineEndReason::Updated => write!(f, "updated"), + } + } +} /// A Pipeless pipeline is an association of an input pipeline and an /// output pipeline, plus the stages the frames must pass through diff --git a/pipeless/src/stages/path.rs b/pipeless/src/stages/path.rs index eb5a770..f873919 100644 --- a/pipeless/src/stages/path.rs +++ b/pipeless/src/stages/path.rs @@ -43,9 +43,9 @@ pub struct FramePathExecutor { stages: HashMap, } impl FramePathExecutor { - pub fn new(stages_dir: &str) -> Self { + pub fn new(project_dir: &str) -> Self { Self { - stages: pipeless::stages::parser::load_stages(stages_dir) + stages: pipeless::stages::parser::load_stages(project_dir) } }