From cfe3498d715878d408c6fc587613ca14ca671761 Mon Sep 17 00:00:00 2001 From: "Miguel A. Cabrera Minagorri" Date: Tue, 23 Jan 2024 19:01:53 +0100 Subject: [PATCH 1/4] Deprecate stages-dir in favor of project-dir Signed-off-by: Miguel A. Cabrera Minagorri --- pipeless/src/main.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pipeless/src/main.rs b/pipeless/src/main.rs index 7074250..18086a3 100644 --- a/pipeless/src/main.rs +++ b/pipeless/src/main.rs @@ -85,9 +85,12 @@ enum Commands { }, /// Start the pipeless node Start { - /// Read stages from the specified directory - #[arg(short, long)] - stages_dir: String, + /// Pipeless project directory + #[clap(short, long, alias = "stages-dir")] + project_dir: String, + /// Enable event export via Redis + #[clap(short, long)] + export_redis_events: bool, }, /// Add resources such as streams Add { @@ -121,7 +124,7 @@ fn main() { match &cli.command { Some(Commands::Init { project_name , template}) => pipeless_ai::cli::init::init(&project_name, template), - Some(Commands::Start { stages_dir }) => pipeless_ai::cli::start::start_pipeless_node(&stages_dir), + Some(Commands::Start { project_dir , export_redis_events }) => pipeless_ai::cli::start::start_pipeless_node(&project_dir, *export_redis_events), Some(Commands::Add { command }) => { match &command { Some(AddCommand::Stream { input_uri, output_uri, frame_path , restart_policy}) => pipeless_ai::cli::streams::add(input_uri, output_uri, frame_path, restart_policy), From 636b7c41bc8340fd420f2a9f4872fe4d08205ec4 Mon Sep 17 00:00:00 2001 From: "Miguel A. Cabrera Minagorri" Date: Tue, 23 Jan 2024 19:02:31 +0100 Subject: [PATCH 2/4] feat: Add Redis event exporter Signed-off-by: Miguel A. Cabrera Minagorri --- pipeless/Cargo.lock | 57 ++++++++++++++- pipeless/Cargo.toml | 3 +- pipeless/src/cli/start.rs | 20 ++++-- pipeless/src/dispatcher.rs | 106 +++++++++++++++++----------- pipeless/src/event_exporters/mod.rs | 53 ++++++++++++++ pipeless/src/lib.rs | 1 + pipeless/src/main.rs | 2 +- pipeless/src/pipeline.rs | 13 +++- pipeless/src/stages/path.rs | 4 +- 9 files changed, 205 insertions(+), 54 deletions(-) create mode 100644 pipeless/src/event_exporters/mod.rs diff --git a/pipeless/Cargo.lock b/pipeless/Cargo.lock index 2ea425d..cd92b73 100644 --- a/pipeless/Cargo.lock +++ b/pipeless/Cargo.lock @@ -74,6 +74,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "async-trait" +version = "0.1.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a66537f1bb974b254c98ed142ff995236e81b9d0fe4db0575f46612cb15eb0f9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "atomic_refcell" version = "0.1.13" @@ -220,6 +231,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "combine" +version = "4.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -1488,7 +1513,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "pipeless-ai" -version = "1.6.3" +version = "1.7.0" dependencies = [ "clap", "ctrlc", @@ -1508,6 +1533,7 @@ dependencies = [ "ort", "pyo3", "rayon", + "redis", "reqwest", "serde", "serde_derive", @@ -1591,7 +1617,7 @@ dependencies = [ "indoc", "libc", "memoffset", - "parking_lot 0.11.2", + "parking_lot 0.12.1", "pyo3-build-config", "pyo3-ffi", "pyo3-macros", @@ -1707,6 +1733,27 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redis" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c580d9cbbe1d1b479e8d67cf9daf6a62c957e6846048408b80b43ac3f6af84cd" +dependencies = [ + "async-trait", + "bytes", + "combine", + "futures-util", + "itoa", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2 0.4.10", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -1994,6 +2041,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + [[package]] name = "signal-hook" version = "0.3.17" diff --git a/pipeless/Cargo.toml b/pipeless/Cargo.toml index f3718ae..c92ee92 100644 --- a/pipeless/Cargo.toml +++ b/pipeless/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "pipeless-ai" -version = "1.6.3" +version = "1.7.0" edition = "2021" authors = ["Miguel A. Cabrera Minagorri"] description = "An open-source computer vision framework to build and deploy applications in minutes" @@ -47,6 +47,7 @@ gstreamer-rtsp = "0.21.0" inquire = "0.6.2" tabled = "0.15.0" ctrlc = "3.4.2" +redis = { version = "0.24.0", features = ["aio", "tokio-comp"] } [dependencies.uuid] version = "1.4.1" diff --git a/pipeless/src/cli/start.rs b/pipeless/src/cli/start.rs index 87025fd..686ebca 100644 --- a/pipeless/src/cli/start.rs +++ b/pipeless/src/cli/start.rs @@ -1,5 +1,5 @@ use pyo3; -use std::sync::Arc; +use std::{env, sync::Arc}; use tokio::sync::RwLock; use gstreamer as gst; use glib; @@ -8,7 +8,7 @@ use ctrlc; use crate as pipeless; -pub fn start_pipeless_node(stages_dir: &str) { +pub fn start_pipeless_node(project_dir: &str, export_redis_events: bool) { ctrlc::set_handler(|| { println!("Exiting..."); std::process::exit(0); @@ -24,15 +24,27 @@ pub fn start_pipeless_node(stages_dir: &str) { // Initialize Gstreamer gst::init().expect("Unable to initialize gstreamer"); - let frame_path_executor = Arc::new(RwLock::new(pipeless::stages::path::FramePathExecutor::new(stages_dir))); + let frame_path_executor = Arc::new(RwLock::new(pipeless::stages::path::FramePathExecutor::new(project_dir))); // Init Tokio runtime let tokio_rt = tokio::runtime::Runtime::new().expect("Unable to create Tokio runtime"); tokio_rt.block_on(async { + // Create event exporter when enabled + let event_exporter = + if export_redis_events { + let redis_url = env::var("PIPELESS_REDIS_URL") + .expect("Please export the PIPELESS_REDIS_URL environment variable in order to export events to Redis"); + let redis_channel = env::var("PIPELESS_REDIS_CHANNEL") + .expect("Please export the PIPELESS_REDIS_CHANNEL environment variable in order to export events to Redis"); + pipeless::event_exporters::EventExporter::new_redis_exporter(&redis_url, &redis_channel).await + } else { + pipeless::event_exporters::EventExporter::new_none_exporter() + }; + let streams_table = Arc::new(RwLock::new(pipeless::config::streams::StreamsTable::new())); let dispatcher = pipeless::dispatcher::Dispatcher::new(streams_table.clone()); let dispatcher_sender = dispatcher.get_sender().clone(); - pipeless::dispatcher::start(dispatcher, frame_path_executor); + pipeless::dispatcher::start(dispatcher, frame_path_executor, event_exporter); // Use the REST adapter to manage streams let rest_adapter = pipeless::config::adapters::rest::RestAdapter::new(streams_table.clone()); diff --git a/pipeless/src/dispatcher.rs b/pipeless/src/dispatcher.rs index b2291cf..b8fade2 100644 --- a/pipeless/src/dispatcher.rs +++ b/pipeless/src/dispatcher.rs @@ -27,7 +27,9 @@ pub struct Dispatcher { receiver: tokio_stream::wrappers::UnboundedReceiverStream, } impl Dispatcher { - pub fn new(streams_table: Arc>) -> Self { + pub fn new( + streams_table: Arc>, + ) -> Self { let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::(); Self { sender, @@ -36,7 +38,6 @@ impl Dispatcher { ), streams_table } - } pub fn get_sender(&self) -> tokio::sync::mpsc::UnboundedSender { @@ -66,15 +67,18 @@ impl Dispatcher { pub fn start( dispatcher: Dispatcher, - frame_path_executor_arc: Arc> + frame_path_executor_arc: Arc>, + event_exporter: pipeless::event_exporters::EventExporter, ) { let running_managers: Arc>> = Arc::new(RwLock::new(HashMap::new())); let frame_path_executor_arc = frame_path_executor_arc.clone(); + let event_exporter_arc = Arc::new(tokio::sync::Mutex::new(event_exporter)); tokio::spawn(async move { let running_managers = running_managers.clone(); let dispatcher_sender = dispatcher.get_sender().clone(); let streams_table = dispatcher.get_streams_table().clone(); + let event_exporter_arc = event_exporter_arc.clone(); // Process events forever let concurrent_limit = 3; dispatcher.process_events(concurrent_limit, move |event, _end_signal| { @@ -82,6 +86,7 @@ pub fn start( let running_managers = running_managers.clone(); let dispatcher_sender = dispatcher_sender.clone(); let streams_table = streams_table.clone(); + let event_exporter_arc = event_exporter_arc.clone(); async move { match event { DispatcherEvent::TableChange => { @@ -195,49 +200,66 @@ pub fn start( } } DispatcherEvent::PipelineFinished(pipeline_id, finish_state) => { - let mut table_write_guard = streams_table.write().await; - let stream_entry_option = table_write_guard.find_by_pipeline_id_mut(pipeline_id); - if let Some(entry) = stream_entry_option { - // Remove the pipeline from the stream entry since it finished - entry.unassign_pipeline(); - - // Update the target state of the stream based on the restart policy - match entry.get_restart_policy() { - pipeless::config::streams::RestartPolicy::Never => { - match finish_state { - pipeless::pipeline::PipelineEndReason::Completed => entry.set_target_state(pipeless::config::streams::StreamEntryState::Completed), - pipeless::pipeline::PipelineEndReason::Error => entry.set_target_state(pipeless::config::streams::StreamEntryState::Error), - pipeless::pipeline::PipelineEndReason::Updated => entry.set_target_state(pipeless::config::streams::StreamEntryState::Running), - } - }, - pipeless::config::streams::RestartPolicy::Always => { - entry.set_target_state(pipeless::config::streams::StreamEntryState::Running); - }, - pipeless::config::streams::RestartPolicy::OnError => { - if finish_state == pipeless::pipeline::PipelineEndReason::Error { - entry.set_target_state(pipeless::config::streams::StreamEntryState::Running); - } else { - entry.set_target_state(pipeless::config::streams::StreamEntryState::Error); - } - }, - pipeless::config::streams::RestartPolicy::OnEos => { - if finish_state == pipeless::pipeline::PipelineEndReason::Completed { + let mut stream_id: Option = None; + { // context to release the write lock + let mut table_write_guard = streams_table.write().await; + let stream_entry_option = table_write_guard.find_by_pipeline_id_mut(pipeline_id); + if let Some(entry) = stream_entry_option { + stream_id = Some(entry.get_id()); + // Remove the pipeline from the stream entry since it finished + entry.unassign_pipeline(); + + // Update the target state of the stream based on the restart policy + match entry.get_restart_policy() { + pipeless::config::streams::RestartPolicy::Never => { + match finish_state { + pipeless::pipeline::PipelineEndReason::Completed => entry.set_target_state(pipeless::config::streams::StreamEntryState::Completed), + pipeless::pipeline::PipelineEndReason::Error => entry.set_target_state(pipeless::config::streams::StreamEntryState::Error), + pipeless::pipeline::PipelineEndReason::Updated => entry.set_target_state(pipeless::config::streams::StreamEntryState::Running), + } + }, + pipeless::config::streams::RestartPolicy::Always => { entry.set_target_state(pipeless::config::streams::StreamEntryState::Running); - } else { - entry.set_target_state(pipeless::config::streams::StreamEntryState::Completed); - } - }, - } + }, + pipeless::config::streams::RestartPolicy::OnError => { + if finish_state == pipeless::pipeline::PipelineEndReason::Error { + entry.set_target_state(pipeless::config::streams::StreamEntryState::Running); + } else { + entry.set_target_state(pipeless::config::streams::StreamEntryState::Error); + } + }, + pipeless::config::streams::RestartPolicy::OnEos => { + if finish_state == pipeless::pipeline::PipelineEndReason::Completed { + entry.set_target_state(pipeless::config::streams::StreamEntryState::Running); + } else { + entry.set_target_state(pipeless::config::streams::StreamEntryState::Completed); + } + }, + } - // Create new event since we have modified the streams config table - if let Err(err) = dispatcher_sender.send(DispatcherEvent::TableChange) { - warn!("Unable to send dispatcher event for streams table changed. Error: {}", err.to_string()); + // Create new event since we have modified the streams config table + if let Err(err) = dispatcher_sender.send(DispatcherEvent::TableChange) { + warn!("Unable to send dispatcher event for streams table changed. Error: {}", err.to_string()); + } + } else { + warn!(" + Unable to unassign pipeline for stream. Stream entry not found. + Pipeline id: {} + ", pipeline_id); } + } + + // Export the event + let ext_event: serde_json::Value = serde_json::json!({ + "type": "StreamFinished", + "end_state": finish_state.to_string(), + "stream_id": stream_id.unwrap_or_default(), + }); + let ext_event_json_str = serde_json::to_string(&ext_event); + if let Ok(json_str) = ext_event_json_str { + event_exporter_arc.lock().await.publish(&json_str).await; } else { - warn!(" - Unable to unassign pipeline for stream. Stream entry not found. - Pipeline id: {} - ", pipeline_id); + warn!("Error serializing event to JSON string, skipping external publishing"); } } } diff --git a/pipeless/src/event_exporters/mod.rs b/pipeless/src/event_exporters/mod.rs new file mode 100644 index 0000000..865f24c --- /dev/null +++ b/pipeless/src/event_exporters/mod.rs @@ -0,0 +1,53 @@ +use log::warn; +use redis::AsyncCommands; + +pub enum EventExporterEnum { + Redis(Redis), +} + +/* + * General event exporter that wraps many types of event exporters + * We cannot use the typical Box to create a common interface because trait methods cannot be async so we just create variants and invoke theit methods + */ +pub struct EventExporter { + exporter: Option, +} +impl EventExporter { + pub fn new_none_exporter() -> Self { + Self { exporter: None } + } + pub async fn new_redis_exporter(redis_url: &str, channel: &str) -> Self { + Self { + exporter: Some(EventExporterEnum::Redis(Redis::new(redis_url, channel).await)), + } + } + pub async fn publish(&mut self, message: &str) { + if let Some(exporter) = &mut self.exporter { + match exporter { + EventExporterEnum::Redis(pblsr) => pblsr.publish(message).await, + } + } + } +} + +/* + * Redis event exporter + */ +pub struct Redis { + connection: redis::aio::Connection, + channel: String, +} +impl Redis { + async fn new(redis_url: &str, channel: &str) -> Self { + let client = redis::Client::open(redis_url).expect("Unable to create Redis client with the provided URL, please check the value of the PIPELESS_REDIS_URL env var"); + let con = client.get_tokio_connection().await.expect("Failed to connect to Redis"); + + Self { connection: con, channel: channel.to_owned() } + } + + async fn publish(&mut self, message: &str) { + if let Err(err) = self.connection.publish::<&str, &str, i32>(&self.channel, message).await { + warn!("Error publishing message to Redis: {}", err.to_string()); + } + } +} diff --git a/pipeless/src/lib.rs b/pipeless/src/lib.rs index eefeff1..a52c47c 100644 --- a/pipeless/src/lib.rs +++ b/pipeless/src/lib.rs @@ -20,3 +20,4 @@ pub mod dispatcher; pub mod stages; pub mod cli; pub mod kvs; +pub mod event_exporters; diff --git a/pipeless/src/main.rs b/pipeless/src/main.rs index 18086a3..1cb29ad 100644 --- a/pipeless/src/main.rs +++ b/pipeless/src/main.rs @@ -90,7 +90,7 @@ enum Commands { project_dir: String, /// Enable event export via Redis #[clap(short, long)] - export_redis_events: bool, + export_events_redis: bool, }, /// Add resources such as streams Add { diff --git a/pipeless/src/pipeline.rs b/pipeless/src/pipeline.rs index 3ec427a..cfecb3a 100644 --- a/pipeless/src/pipeline.rs +++ b/pipeless/src/pipeline.rs @@ -1,7 +1,7 @@ use uuid; use tokio; use tokio::sync::RwLock; -use std::sync::Arc; +use std::{fmt, sync::Arc}; use log::{info, error, warn}; use crate as pipeless; @@ -31,12 +31,21 @@ impl From for PipelineError { } } -#[derive(Eq,PartialEq)] +#[derive(Debug,Eq,PartialEq)] pub enum PipelineEndReason { Completed, // End of stream Error, Updated, // When the user changes the stream definition } +impl fmt::Display for PipelineEndReason { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PipelineEndReason::Completed => write!(f, "completed"), + PipelineEndReason::Error => write!(f, "error"), + PipelineEndReason::Updated => write!(f, "updated"), + } + } +} /// A Pipeless pipeline is an association of an input pipeline and an /// output pipeline, plus the stages the frames must pass through diff --git a/pipeless/src/stages/path.rs b/pipeless/src/stages/path.rs index eb5a770..f873919 100644 --- a/pipeless/src/stages/path.rs +++ b/pipeless/src/stages/path.rs @@ -43,9 +43,9 @@ pub struct FramePathExecutor { stages: HashMap, } impl FramePathExecutor { - pub fn new(stages_dir: &str) -> Self { + pub fn new(project_dir: &str) -> Self { Self { - stages: pipeless::stages::parser::load_stages(stages_dir) + stages: pipeless::stages::parser::load_stages(project_dir) } } From ccb5a3d6946d024514f582dac5c76a5cec2895b2 Mon Sep 17 00:00:00 2001 From: "Miguel A. Cabrera Minagorri" Date: Wed, 24 Jan 2024 15:50:11 +0100 Subject: [PATCH 3/4] Use stream_uuid instead of stream_id Signed-off-by: Miguel A. Cabrera Minagorri --- pipeless/src/dispatcher.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipeless/src/dispatcher.rs b/pipeless/src/dispatcher.rs index b8fade2..571d638 100644 --- a/pipeless/src/dispatcher.rs +++ b/pipeless/src/dispatcher.rs @@ -200,12 +200,12 @@ pub fn start( } } DispatcherEvent::PipelineFinished(pipeline_id, finish_state) => { - let mut stream_id: Option = None; + let mut stream_uuid: Option = None; { // context to release the write lock let mut table_write_guard = streams_table.write().await; let stream_entry_option = table_write_guard.find_by_pipeline_id_mut(pipeline_id); if let Some(entry) = stream_entry_option { - stream_id = Some(entry.get_id()); + stream_uuid = Some(entry.get_id()); // Remove the pipeline from the stream entry since it finished entry.unassign_pipeline(); @@ -253,7 +253,7 @@ pub fn start( let ext_event: serde_json::Value = serde_json::json!({ "type": "StreamFinished", "end_state": finish_state.to_string(), - "stream_id": stream_id.unwrap_or_default(), + "stream_uuid": stream_uuid.unwrap_or_default(), }); let ext_event_json_str = serde_json::to_string(&ext_event); if let Ok(json_str) = ext_event_json_str { From ce1b28cd2c50330420bb3cfbe0ea8c44e397912c Mon Sep 17 00:00:00 2001 From: "Miguel A. Cabrera Minagorri" Date: Wed, 24 Jan 2024 17:17:11 +0100 Subject: [PATCH 4/4] Make event exporter global Signed-off-by: Miguel A. Cabrera Minagorri --- pipeless/src/cli/start.rs | 6 +++- pipeless/src/dispatcher.rs | 22 ++++-------- pipeless/src/event_exporters/events.rs | 49 ++++++++++++++++++++++++++ pipeless/src/event_exporters/mod.rs | 13 +++++++ pipeless/src/main.rs | 2 +- 5 files changed, 74 insertions(+), 18 deletions(-) create mode 100644 pipeless/src/event_exporters/events.rs diff --git a/pipeless/src/cli/start.rs b/pipeless/src/cli/start.rs index 686ebca..54595a2 100644 --- a/pipeless/src/cli/start.rs +++ b/pipeless/src/cli/start.rs @@ -40,11 +40,15 @@ pub fn start_pipeless_node(project_dir: &str, export_redis_events: bool) { } else { pipeless::event_exporters::EventExporter::new_none_exporter() }; + { // Context to lock the global event exporter in order to set it + let mut e_exp = pipeless::event_exporters::EVENT_EXPORTER.lock().await; + *e_exp = event_exporter; + } let streams_table = Arc::new(RwLock::new(pipeless::config::streams::StreamsTable::new())); let dispatcher = pipeless::dispatcher::Dispatcher::new(streams_table.clone()); let dispatcher_sender = dispatcher.get_sender().clone(); - pipeless::dispatcher::start(dispatcher, frame_path_executor, event_exporter); + pipeless::dispatcher::start(dispatcher, frame_path_executor); // Use the REST adapter to manage streams let rest_adapter = pipeless::config::adapters::rest::RestAdapter::new(streams_table.clone()); diff --git a/pipeless/src/dispatcher.rs b/pipeless/src/dispatcher.rs index 571d638..e093840 100644 --- a/pipeless/src/dispatcher.rs +++ b/pipeless/src/dispatcher.rs @@ -68,17 +68,14 @@ impl Dispatcher { pub fn start( dispatcher: Dispatcher, frame_path_executor_arc: Arc>, - event_exporter: pipeless::event_exporters::EventExporter, ) { let running_managers: Arc>> = Arc::new(RwLock::new(HashMap::new())); let frame_path_executor_arc = frame_path_executor_arc.clone(); - let event_exporter_arc = Arc::new(tokio::sync::Mutex::new(event_exporter)); tokio::spawn(async move { let running_managers = running_managers.clone(); let dispatcher_sender = dispatcher.get_sender().clone(); let streams_table = dispatcher.get_streams_table().clone(); - let event_exporter_arc = event_exporter_arc.clone(); // Process events forever let concurrent_limit = 3; dispatcher.process_events(concurrent_limit, move |event, _end_signal| { @@ -86,7 +83,6 @@ pub fn start( let running_managers = running_managers.clone(); let dispatcher_sender = dispatcher_sender.clone(); let streams_table = streams_table.clone(); - let event_exporter_arc = event_exporter_arc.clone(); async move { match event { DispatcherEvent::TableChange => { @@ -157,6 +153,7 @@ pub fn start( new_manager.get_pipeline_id().await ) { error!("Error adding new stream to the streams config table: {}", err); + pipeless::event_exporters::events::export_stream_start_error_event(entry.get_id()).await; } let mut managers_map_guard = running_managers.write().await; managers_map_guard.insert(new_manager.get_pipeline_id().await, new_manager); @@ -165,6 +162,7 @@ pub fn start( error!("Unable to create new pipeline: {}. Rolling back streams configuration.", err.to_string()); let removed = streams_table_guard.remove(entry.get_id()); if removed.is_none() { warn!("Error rolling back table, entry not found.") }; + pipeless::event_exporters::events::export_stream_start_error_event(entry.get_id()).await; } } }, @@ -249,18 +247,10 @@ pub fn start( } } - // Export the event - let ext_event: serde_json::Value = serde_json::json!({ - "type": "StreamFinished", - "end_state": finish_state.to_string(), - "stream_uuid": stream_uuid.unwrap_or_default(), - }); - let ext_event_json_str = serde_json::to_string(&ext_event); - if let Ok(json_str) = ext_event_json_str { - event_exporter_arc.lock().await.publish(&json_str).await; - } else { - warn!("Error serializing event to JSON string, skipping external publishing"); - } + pipeless::event_exporters::events::export_stream_finished_event( + stream_uuid.unwrap_or_default(), + finish_state.to_string().as_str() + ).await; } } } diff --git a/pipeless/src/event_exporters/events.rs b/pipeless/src/event_exporters/events.rs new file mode 100644 index 0000000..ba51384 --- /dev/null +++ b/pipeless/src/event_exporters/events.rs @@ -0,0 +1,49 @@ +use std::fmt; +use log::warn; + +pub enum EventType { + StreamStartError, + StreamFinished, +} +impl fmt::Display for EventType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + EventType::StreamStartError => write!(f, "StreamStartError"), + EventType::StreamFinished => write!(f, "StreamFinished"), + } + } +} + +/* + * Exports a stream finished event to the external event exporter when it is enabled + */ +pub async fn export_stream_finished_event(stream_uuid: uuid::Uuid, stream_end_state: &str) { + let ext_event: serde_json::Value = serde_json::json!({ + "type": EventType::StreamFinished.to_string(), + "end_state": stream_end_state, + "stream_uuid": stream_uuid.to_string(), + }); + let ext_event_json_str = serde_json::to_string(&ext_event); + if let Ok(json_str) = ext_event_json_str { + super::EVENT_EXPORTER.lock().await.publish(&json_str).await; + } else { + warn!("Error serializing event to JSON string, skipping external publishing"); + } +} + +/* + * Exports a stream start error event to the external event exporter when it is enabled + */ +pub async fn export_stream_start_error_event(stream_uuid: uuid::Uuid) { + let ext_event: serde_json::Value = serde_json::json!({ + "type": EventType::StreamStartError.to_string(), + "end_state": "error", + "stream_uuid": stream_uuid.to_string(), + }); + let ext_event_json_str = serde_json::to_string(&ext_event); + if let Ok(json_str) = ext_event_json_str { + super::EVENT_EXPORTER.lock().await.publish(&json_str).await; + } else { + warn!("Error serializing event to JSON string, skipping external publishing"); + } +} diff --git a/pipeless/src/event_exporters/mod.rs b/pipeless/src/event_exporters/mod.rs index 865f24c..09590f6 100644 --- a/pipeless/src/event_exporters/mod.rs +++ b/pipeless/src/event_exporters/mod.rs @@ -1,5 +1,11 @@ +use std::sync::Arc; + use log::warn; use redis::AsyncCommands; +use lazy_static::lazy_static; +use tokio::sync::Mutex; + +pub mod events; pub enum EventExporterEnum { Redis(Redis), @@ -51,3 +57,10 @@ impl Redis { } } } + +// Create global variable to access the event exporter from any point of the code +// It uses an Arc to be shared among threads and a Mutex since the connection is updated on every push +lazy_static! { + pub static ref EVENT_EXPORTER: Arc> = + Arc::new(Mutex::new(EventExporter::new_none_exporter())); +} diff --git a/pipeless/src/main.rs b/pipeless/src/main.rs index 1cb29ad..4181505 100644 --- a/pipeless/src/main.rs +++ b/pipeless/src/main.rs @@ -124,7 +124,7 @@ fn main() { match &cli.command { Some(Commands::Init { project_name , template}) => pipeless_ai::cli::init::init(&project_name, template), - Some(Commands::Start { project_dir , export_redis_events }) => pipeless_ai::cli::start::start_pipeless_node(&project_dir, *export_redis_events), + Some(Commands::Start { project_dir , export_events_redis }) => pipeless_ai::cli::start::start_pipeless_node(&project_dir, *export_events_redis), Some(Commands::Add { command }) => { match &command { Some(AddCommand::Stream { input_uri, output_uri, frame_path , restart_policy}) => pipeless_ai::cli::streams::add(input_uri, output_uri, frame_path, restart_policy),