From ce1b28cd2c50330420bb3cfbe0ea8c44e397912c Mon Sep 17 00:00:00 2001 From: "Miguel A. Cabrera Minagorri" Date: Wed, 24 Jan 2024 17:17:11 +0100 Subject: [PATCH] Make event exporter global Signed-off-by: Miguel A. Cabrera Minagorri --- pipeless/src/cli/start.rs | 6 +++- pipeless/src/dispatcher.rs | 22 ++++-------- pipeless/src/event_exporters/events.rs | 49 ++++++++++++++++++++++++++ pipeless/src/event_exporters/mod.rs | 13 +++++++ pipeless/src/main.rs | 2 +- 5 files changed, 74 insertions(+), 18 deletions(-) create mode 100644 pipeless/src/event_exporters/events.rs diff --git a/pipeless/src/cli/start.rs b/pipeless/src/cli/start.rs index 686ebca..54595a2 100644 --- a/pipeless/src/cli/start.rs +++ b/pipeless/src/cli/start.rs @@ -40,11 +40,15 @@ pub fn start_pipeless_node(project_dir: &str, export_redis_events: bool) { } else { pipeless::event_exporters::EventExporter::new_none_exporter() }; + { // Context to lock the global event exporter in order to set it + let mut e_exp = pipeless::event_exporters::EVENT_EXPORTER.lock().await; + *e_exp = event_exporter; + } let streams_table = Arc::new(RwLock::new(pipeless::config::streams::StreamsTable::new())); let dispatcher = pipeless::dispatcher::Dispatcher::new(streams_table.clone()); let dispatcher_sender = dispatcher.get_sender().clone(); - pipeless::dispatcher::start(dispatcher, frame_path_executor, event_exporter); + pipeless::dispatcher::start(dispatcher, frame_path_executor); // Use the REST adapter to manage streams let rest_adapter = pipeless::config::adapters::rest::RestAdapter::new(streams_table.clone()); diff --git a/pipeless/src/dispatcher.rs b/pipeless/src/dispatcher.rs index 571d638..e093840 100644 --- a/pipeless/src/dispatcher.rs +++ b/pipeless/src/dispatcher.rs @@ -68,17 +68,14 @@ impl Dispatcher { pub fn start( dispatcher: Dispatcher, frame_path_executor_arc: Arc>, - event_exporter: pipeless::event_exporters::EventExporter, ) { let running_managers: Arc>> = Arc::new(RwLock::new(HashMap::new())); let frame_path_executor_arc = frame_path_executor_arc.clone(); - let event_exporter_arc = Arc::new(tokio::sync::Mutex::new(event_exporter)); tokio::spawn(async move { let running_managers = running_managers.clone(); let dispatcher_sender = dispatcher.get_sender().clone(); let streams_table = dispatcher.get_streams_table().clone(); - let event_exporter_arc = event_exporter_arc.clone(); // Process events forever let concurrent_limit = 3; dispatcher.process_events(concurrent_limit, move |event, _end_signal| { @@ -86,7 +83,6 @@ pub fn start( let running_managers = running_managers.clone(); let dispatcher_sender = dispatcher_sender.clone(); let streams_table = streams_table.clone(); - let event_exporter_arc = event_exporter_arc.clone(); async move { match event { DispatcherEvent::TableChange => { @@ -157,6 +153,7 @@ pub fn start( new_manager.get_pipeline_id().await ) { error!("Error adding new stream to the streams config table: {}", err); + pipeless::event_exporters::events::export_stream_start_error_event(entry.get_id()).await; } let mut managers_map_guard = running_managers.write().await; managers_map_guard.insert(new_manager.get_pipeline_id().await, new_manager); @@ -165,6 +162,7 @@ pub fn start( error!("Unable to create new pipeline: {}. Rolling back streams configuration.", err.to_string()); let removed = streams_table_guard.remove(entry.get_id()); if removed.is_none() { warn!("Error rolling back table, entry not found.") }; + pipeless::event_exporters::events::export_stream_start_error_event(entry.get_id()).await; } } }, @@ -249,18 +247,10 @@ pub fn start( } } - // Export the event - let ext_event: serde_json::Value = serde_json::json!({ - "type": "StreamFinished", - "end_state": finish_state.to_string(), - "stream_uuid": stream_uuid.unwrap_or_default(), - }); - let ext_event_json_str = serde_json::to_string(&ext_event); - if let Ok(json_str) = ext_event_json_str { - event_exporter_arc.lock().await.publish(&json_str).await; - } else { - warn!("Error serializing event to JSON string, skipping external publishing"); - } + pipeless::event_exporters::events::export_stream_finished_event( + stream_uuid.unwrap_or_default(), + finish_state.to_string().as_str() + ).await; } } } diff --git a/pipeless/src/event_exporters/events.rs b/pipeless/src/event_exporters/events.rs new file mode 100644 index 0000000..ba51384 --- /dev/null +++ b/pipeless/src/event_exporters/events.rs @@ -0,0 +1,49 @@ +use std::fmt; +use log::warn; + +pub enum EventType { + StreamStartError, + StreamFinished, +} +impl fmt::Display for EventType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + EventType::StreamStartError => write!(f, "StreamStartError"), + EventType::StreamFinished => write!(f, "StreamFinished"), + } + } +} + +/* + * Exports a stream finished event to the external event exporter when it is enabled + */ +pub async fn export_stream_finished_event(stream_uuid: uuid::Uuid, stream_end_state: &str) { + let ext_event: serde_json::Value = serde_json::json!({ + "type": EventType::StreamFinished.to_string(), + "end_state": stream_end_state, + "stream_uuid": stream_uuid.to_string(), + }); + let ext_event_json_str = serde_json::to_string(&ext_event); + if let Ok(json_str) = ext_event_json_str { + super::EVENT_EXPORTER.lock().await.publish(&json_str).await; + } else { + warn!("Error serializing event to JSON string, skipping external publishing"); + } +} + +/* + * Exports a stream start error event to the external event exporter when it is enabled + */ +pub async fn export_stream_start_error_event(stream_uuid: uuid::Uuid) { + let ext_event: serde_json::Value = serde_json::json!({ + "type": EventType::StreamStartError.to_string(), + "end_state": "error", + "stream_uuid": stream_uuid.to_string(), + }); + let ext_event_json_str = serde_json::to_string(&ext_event); + if let Ok(json_str) = ext_event_json_str { + super::EVENT_EXPORTER.lock().await.publish(&json_str).await; + } else { + warn!("Error serializing event to JSON string, skipping external publishing"); + } +} diff --git a/pipeless/src/event_exporters/mod.rs b/pipeless/src/event_exporters/mod.rs index 865f24c..09590f6 100644 --- a/pipeless/src/event_exporters/mod.rs +++ b/pipeless/src/event_exporters/mod.rs @@ -1,5 +1,11 @@ +use std::sync::Arc; + use log::warn; use redis::AsyncCommands; +use lazy_static::lazy_static; +use tokio::sync::Mutex; + +pub mod events; pub enum EventExporterEnum { Redis(Redis), @@ -51,3 +57,10 @@ impl Redis { } } } + +// Create global variable to access the event exporter from any point of the code +// It uses an Arc to be shared among threads and a Mutex since the connection is updated on every push +lazy_static! { + pub static ref EVENT_EXPORTER: Arc> = + Arc::new(Mutex::new(EventExporter::new_none_exporter())); +} diff --git a/pipeless/src/main.rs b/pipeless/src/main.rs index 1cb29ad..4181505 100644 --- a/pipeless/src/main.rs +++ b/pipeless/src/main.rs @@ -124,7 +124,7 @@ fn main() { match &cli.command { Some(Commands::Init { project_name , template}) => pipeless_ai::cli::init::init(&project_name, template), - Some(Commands::Start { project_dir , export_redis_events }) => pipeless_ai::cli::start::start_pipeless_node(&project_dir, *export_redis_events), + Some(Commands::Start { project_dir , export_events_redis }) => pipeless_ai::cli::start::start_pipeless_node(&project_dir, *export_events_redis), Some(Commands::Add { command }) => { match &command { Some(AddCommand::Stream { input_uri, output_uri, frame_path , restart_policy}) => pipeless_ai::cli::streams::add(input_uri, output_uri, frame_path, restart_policy),