Skip to content

Commit

Permalink
feat: Add Redis event exporter
Browse files Browse the repository at this point in the history
Signed-off-by: Miguel A. Cabrera Minagorri <[email protected]>
  • Loading branch information
miguelaeh committed Jan 29, 2024
1 parent cfe3498 commit 636b7c4
Show file tree
Hide file tree
Showing 9 changed files with 205 additions and 54 deletions.
57 changes: 55 additions & 2 deletions pipeless/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pipeless/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pipeless-ai"
version = "1.6.3"
version = "1.7.0"
edition = "2021"
authors = ["Miguel A. Cabrera Minagorri"]
description = "An open-source computer vision framework to build and deploy applications in minutes"
Expand Down Expand Up @@ -47,6 +47,7 @@ gstreamer-rtsp = "0.21.0"
inquire = "0.6.2"
tabled = "0.15.0"
ctrlc = "3.4.2"
redis = { version = "0.24.0", features = ["aio", "tokio-comp"] }

[dependencies.uuid]
version = "1.4.1"
Expand Down
20 changes: 16 additions & 4 deletions pipeless/src/cli/start.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use pyo3;
use std::sync::Arc;
use std::{env, sync::Arc};
use tokio::sync::RwLock;
use gstreamer as gst;
use glib;
Expand All @@ -8,7 +8,7 @@ use ctrlc;

use crate as pipeless;

pub fn start_pipeless_node(stages_dir: &str) {
pub fn start_pipeless_node(project_dir: &str, export_redis_events: bool) {
ctrlc::set_handler(|| {
println!("Exiting...");
std::process::exit(0);
Expand All @@ -24,15 +24,27 @@ pub fn start_pipeless_node(stages_dir: &str) {
// Initialize Gstreamer
gst::init().expect("Unable to initialize gstreamer");

let frame_path_executor = Arc::new(RwLock::new(pipeless::stages::path::FramePathExecutor::new(stages_dir)));
let frame_path_executor = Arc::new(RwLock::new(pipeless::stages::path::FramePathExecutor::new(project_dir)));

// Init Tokio runtime
let tokio_rt = tokio::runtime::Runtime::new().expect("Unable to create Tokio runtime");
tokio_rt.block_on(async {
// Create event exporter when enabled
let event_exporter =
if export_redis_events {
let redis_url = env::var("PIPELESS_REDIS_URL")
.expect("Please export the PIPELESS_REDIS_URL environment variable in order to export events to Redis");
let redis_channel = env::var("PIPELESS_REDIS_CHANNEL")
.expect("Please export the PIPELESS_REDIS_CHANNEL environment variable in order to export events to Redis");
pipeless::event_exporters::EventExporter::new_redis_exporter(&redis_url, &redis_channel).await
} else {
pipeless::event_exporters::EventExporter::new_none_exporter()
};

let streams_table = Arc::new(RwLock::new(pipeless::config::streams::StreamsTable::new()));
let dispatcher = pipeless::dispatcher::Dispatcher::new(streams_table.clone());
let dispatcher_sender = dispatcher.get_sender().clone();
pipeless::dispatcher::start(dispatcher, frame_path_executor);
pipeless::dispatcher::start(dispatcher, frame_path_executor, event_exporter);

// Use the REST adapter to manage streams
let rest_adapter = pipeless::config::adapters::rest::RestAdapter::new(streams_table.clone());
Expand Down
106 changes: 64 additions & 42 deletions pipeless/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ pub struct Dispatcher {
receiver: tokio_stream::wrappers::UnboundedReceiverStream<DispatcherEvent>,
}
impl Dispatcher {
pub fn new(streams_table: Arc<RwLock<pipeless::config::streams::StreamsTable>>) -> Self {
pub fn new(
streams_table: Arc<RwLock<pipeless::config::streams::StreamsTable>>,
) -> Self {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<DispatcherEvent>();
Self {
sender,
Expand All @@ -36,7 +38,6 @@ impl Dispatcher {
),
streams_table
}

}

pub fn get_sender(&self) -> tokio::sync::mpsc::UnboundedSender<DispatcherEvent> {
Expand Down Expand Up @@ -66,22 +67,26 @@ impl Dispatcher {

pub fn start(
dispatcher: Dispatcher,
frame_path_executor_arc: Arc<RwLock<pipeless::stages::path::FramePathExecutor>>
frame_path_executor_arc: Arc<RwLock<pipeless::stages::path::FramePathExecutor>>,
event_exporter: pipeless::event_exporters::EventExporter,
) {
let running_managers: Arc<RwLock<HashMap<uuid::Uuid, pipeless::pipeline::Manager>>> = Arc::new(RwLock::new(HashMap::new()));
let frame_path_executor_arc = frame_path_executor_arc.clone();
let event_exporter_arc = Arc::new(tokio::sync::Mutex::new(event_exporter));

tokio::spawn(async move {
let running_managers = running_managers.clone();
let dispatcher_sender = dispatcher.get_sender().clone();
let streams_table = dispatcher.get_streams_table().clone();
let event_exporter_arc = event_exporter_arc.clone();
// Process events forever
let concurrent_limit = 3;
dispatcher.process_events(concurrent_limit, move |event, _end_signal| {
let frame_path_executor_arc = frame_path_executor_arc.clone();
let running_managers = running_managers.clone();
let dispatcher_sender = dispatcher_sender.clone();
let streams_table = streams_table.clone();
let event_exporter_arc = event_exporter_arc.clone();
async move {
match event {
DispatcherEvent::TableChange => {
Expand Down Expand Up @@ -195,49 +200,66 @@ pub fn start(
}
}
DispatcherEvent::PipelineFinished(pipeline_id, finish_state) => {
let mut table_write_guard = streams_table.write().await;
let stream_entry_option = table_write_guard.find_by_pipeline_id_mut(pipeline_id);
if let Some(entry) = stream_entry_option {
// Remove the pipeline from the stream entry since it finished
entry.unassign_pipeline();

// Update the target state of the stream based on the restart policy
match entry.get_restart_policy() {
pipeless::config::streams::RestartPolicy::Never => {
match finish_state {
pipeless::pipeline::PipelineEndReason::Completed => entry.set_target_state(pipeless::config::streams::StreamEntryState::Completed),
pipeless::pipeline::PipelineEndReason::Error => entry.set_target_state(pipeless::config::streams::StreamEntryState::Error),
pipeless::pipeline::PipelineEndReason::Updated => entry.set_target_state(pipeless::config::streams::StreamEntryState::Running),
}
},
pipeless::config::streams::RestartPolicy::Always => {
entry.set_target_state(pipeless::config::streams::StreamEntryState::Running);
},
pipeless::config::streams::RestartPolicy::OnError => {
if finish_state == pipeless::pipeline::PipelineEndReason::Error {
entry.set_target_state(pipeless::config::streams::StreamEntryState::Running);
} else {
entry.set_target_state(pipeless::config::streams::StreamEntryState::Error);
}
},
pipeless::config::streams::RestartPolicy::OnEos => {
if finish_state == pipeless::pipeline::PipelineEndReason::Completed {
let mut stream_id: Option<uuid::Uuid> = None;
{ // context to release the write lock
let mut table_write_guard = streams_table.write().await;
let stream_entry_option = table_write_guard.find_by_pipeline_id_mut(pipeline_id);
if let Some(entry) = stream_entry_option {
stream_id = Some(entry.get_id());
// Remove the pipeline from the stream entry since it finished
entry.unassign_pipeline();

// Update the target state of the stream based on the restart policy
match entry.get_restart_policy() {
pipeless::config::streams::RestartPolicy::Never => {
match finish_state {
pipeless::pipeline::PipelineEndReason::Completed => entry.set_target_state(pipeless::config::streams::StreamEntryState::Completed),
pipeless::pipeline::PipelineEndReason::Error => entry.set_target_state(pipeless::config::streams::StreamEntryState::Error),
pipeless::pipeline::PipelineEndReason::Updated => entry.set_target_state(pipeless::config::streams::StreamEntryState::Running),
}
},
pipeless::config::streams::RestartPolicy::Always => {
entry.set_target_state(pipeless::config::streams::StreamEntryState::Running);
} else {
entry.set_target_state(pipeless::config::streams::StreamEntryState::Completed);
}
},
}
},
pipeless::config::streams::RestartPolicy::OnError => {
if finish_state == pipeless::pipeline::PipelineEndReason::Error {
entry.set_target_state(pipeless::config::streams::StreamEntryState::Running);
} else {
entry.set_target_state(pipeless::config::streams::StreamEntryState::Error);
}
},
pipeless::config::streams::RestartPolicy::OnEos => {
if finish_state == pipeless::pipeline::PipelineEndReason::Completed {
entry.set_target_state(pipeless::config::streams::StreamEntryState::Running);
} else {
entry.set_target_state(pipeless::config::streams::StreamEntryState::Completed);
}
},
}

// Create new event since we have modified the streams config table
if let Err(err) = dispatcher_sender.send(DispatcherEvent::TableChange) {
warn!("Unable to send dispatcher event for streams table changed. Error: {}", err.to_string());
// Create new event since we have modified the streams config table
if let Err(err) = dispatcher_sender.send(DispatcherEvent::TableChange) {
warn!("Unable to send dispatcher event for streams table changed. Error: {}", err.to_string());
}
} else {
warn!("
Unable to unassign pipeline for stream. Stream entry not found.
Pipeline id: {}
", pipeline_id);
}
}

// Export the event
let ext_event: serde_json::Value = serde_json::json!({
"type": "StreamFinished",
"end_state": finish_state.to_string(),
"stream_id": stream_id.unwrap_or_default(),
});
let ext_event_json_str = serde_json::to_string(&ext_event);
if let Ok(json_str) = ext_event_json_str {
event_exporter_arc.lock().await.publish(&json_str).await;
} else {
warn!("
Unable to unassign pipeline for stream. Stream entry not found.
Pipeline id: {}
", pipeline_id);
warn!("Error serializing event to JSON string, skipping external publishing");
}
}
}
Expand Down
53 changes: 53 additions & 0 deletions pipeless/src/event_exporters/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use log::warn;
use redis::AsyncCommands;

pub enum EventExporterEnum {
Redis(Redis),
}

/*
* General event exporter that wraps many types of event exporters
* We cannot use the typical Box<dyn ExporterTrait> to create a common interface because trait methods cannot be async so we just create variants and invoke theit methods
*/
pub struct EventExporter {
exporter: Option<EventExporterEnum>,
}
impl EventExporter {
pub fn new_none_exporter() -> Self {
Self { exporter: None }
}
pub async fn new_redis_exporter(redis_url: &str, channel: &str) -> Self {
Self {
exporter: Some(EventExporterEnum::Redis(Redis::new(redis_url, channel).await)),
}
}
pub async fn publish(&mut self, message: &str) {
if let Some(exporter) = &mut self.exporter {
match exporter {
EventExporterEnum::Redis(pblsr) => pblsr.publish(message).await,
}
}
}
}

/*
* Redis event exporter
*/
pub struct Redis {
connection: redis::aio::Connection,
channel: String,
}
impl Redis {
async fn new(redis_url: &str, channel: &str) -> Self {
let client = redis::Client::open(redis_url).expect("Unable to create Redis client with the provided URL, please check the value of the PIPELESS_REDIS_URL env var");
let con = client.get_tokio_connection().await.expect("Failed to connect to Redis");

Self { connection: con, channel: channel.to_owned() }
}

async fn publish(&mut self, message: &str) {
if let Err(err) = self.connection.publish::<&str, &str, i32>(&self.channel, message).await {
warn!("Error publishing message to Redis: {}", err.to_string());
}
}
}
1 change: 1 addition & 0 deletions pipeless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ pub mod dispatcher;
pub mod stages;
pub mod cli;
pub mod kvs;
pub mod event_exporters;
2 changes: 1 addition & 1 deletion pipeless/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ enum Commands {
project_dir: String,
/// Enable event export via Redis
#[clap(short, long)]
export_redis_events: bool,
export_events_redis: bool,
},
/// Add resources such as streams
Add {
Expand Down
Loading

0 comments on commit 636b7c4

Please sign in to comment.