Skip to content

Commit

Permalink
optimize kafka to send multiple threads
Browse files Browse the repository at this point in the history
avoid kafka bottlenecks
  • Loading branch information
ltn-chriskennedy committed Apr 21, 2024
1 parent ffdd7d4 commit 2fecb02
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 59 deletions.
8 changes: 4 additions & 4 deletions scripts/setup_env.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PREFIX=/opt/rsprobe
GST_PLUGIN_PATH=$PREFIX/lib64/gstreamer-1.0
LD_LIBRARY_PATH=$PREFIX/lib64:$PREFIX/lib:$LD_LIBRARY_PATH
PATH=$PREFIX/bin:$PATH
export PREFIX=/opt/rsprobe
export GST_PLUGIN_PATH=$PREFIX/lib64/gstreamer-1.0
export LD_LIBRARY_PATH=$PREFIX/lib64:$PREFIX/lib:$LD_LIBRARY_PATH
export PATH=$PREFIX/bin:$PATH

# For pkg-config to find .pc files
export PKG_CONFIG_PATH=$PREFIX/lib64/pkgconfig:$PREFIX/lib/pkgconfig:$PKG_CONFIG_PATH
Expand Down
139 changes: 84 additions & 55 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use pcap::{Active, Capture, Device, PacketCodec};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic};
use rdkafka::client::DefaultClientContext;
use rdkafka::config::ClientConfig;
use rdkafka::error::KafkaError;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::types::RDKafkaErrorCode;
use rsprobe::get_system_stats;
use rsprobe::stream_data::process_mpegts_packet;
use rsprobe::stream_data::{
Expand Down Expand Up @@ -189,41 +191,6 @@ fn flatten_streams(
flat_structure
}

async fn kafka_produce_message(
data: Vec<u8>,
kafka_server: String,
kafka_topic: String,
kafka_timeout: u64,
key: String,
_stream_data_timestamp: i64,
producer: FutureProducer,
) {
log::debug!("Service {} sending message", kafka_topic);
let kafka_topic = kafka_topic.replace(":", "_").replace(".", "_");

log::debug!(
"Forwarding message for topic {} to Kafka server {:?}",
kafka_topic,
kafka_server
);

let record = FutureRecord::to(&kafka_topic).payload(&data).key(&key);
let delivery_future = producer.send(record, Duration::from_millis(kafka_timeout));

match delivery_future.await {
Ok((partition, offset)) => {
log::debug!(
"Message delivered to partition {} at offset {}",
partition,
offset
);
}
Err(e) => {
log::error!("Failed to deliver message: {:?}", e);
}
}
}

// Define your custom PacketCodec
pub struct BoxCodec;

Expand Down Expand Up @@ -464,6 +431,61 @@ fn init_pcap(
Ok((cap, socket))
}

async fn create_kafka_producer(kafka_config: &ClientConfig) -> FutureProducer {
kafka_config
.create()
.expect("Failed to create Kafka producer")
}

async fn send_to_kafka(
producer: &FutureProducer,
topic: &str,
key: &str,
payload: &str,
timeout: Duration,
retry_attempts: usize,
retry_delay: Duration,
) -> Result<(), KafkaError> {
let mut attempt = 0;
loop {
let record = FutureRecord::to(topic).payload(payload).key(key);

match producer.send(record, timeout).await {
Ok((partition, offset)) => {
log::debug!(
"Message sent successfully to topic: {}, partition: {}, offset: {}",
topic,
partition,
offset
);
return Ok(());
}
Err((KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), _)) => {
attempt += 1;
if attempt >= retry_attempts {
log::error!(
"Failed to send message after {} retries. Giving up.",
attempt
);
return Err(KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull));
} else {
log::warn!(
"Queue is full. Retrying in {} ms... (attempt {}/{})",
retry_delay.as_millis(),
attempt,
retry_attempts
);
tokio::time::sleep(retry_delay).await;
}
}
Err((err, _)) => {
log::error!("Failed to send message: {:?}", err);
return Err(err);
}
}
}
}

/// RsProbe Configuration
#[derive(Parser, Debug)]
#[clap(
Expand Down Expand Up @@ -930,7 +952,6 @@ async fn rsprobe(running: Arc<AtomicBool>) {
let kafka_broker_clone = args.kafka_broker.clone();
let kafka_topic_clone = args.kafka_topic.clone();
let kafka_topic_clone1 = args.kafka_topic.clone();
let kafka_broker_clone1 = args.kafka_broker.clone();
let kafka_topic_clone2 = args.kafka_topic.clone();
let kafka_broker_clone2 = args.kafka_broker.clone();

Expand Down Expand Up @@ -960,9 +981,6 @@ async fn rsprobe(running: Arc<AtomicBool>) {

let admin_client: AdminClient<DefaultClientContext> =
kafka_conf.create().expect("Failed to create admin client");
let producer: FutureProducer = kafka_conf
.create()
.expect("Failed to create Kafka producer");

// This code block tries to create the topic if it doesn't already exist
// ignoring errors that indicate existence.
Expand Down Expand Up @@ -1435,25 +1453,36 @@ async fn rsprobe(running: Arc<AtomicBool>) {
}
}

// Inside the loop
for (_probe_id, probe_data) in averaged_probe_data.iter() {
let json_data = serde_json::to_value(probe_data)
let json_data = serde_json::to_string(probe_data)
.expect("Failed to serialize probe data for Kafka");

let ser_data = serde_json::to_vec(&json_data)
.expect("Failed to serialize json data for Kafka");

// Produce the message to Kafka
let future = kafka_produce_message(
ser_data,
kafka_broker_clone1.clone(),
kafka_topic_clone1.clone(),
args.kafka_timeout.clone(),
args.kafka_key.clone(),
current_unix_timestamp_ms().unwrap_or(0) as i64,
producer.clone(),
);

future.await;
// Clone the values before moving them into the async block
let kafka_conf_clone = kafka_conf.clone();
let kafka_topic_clone = kafka_topic_clone1.clone();
let kafka_key_clone = args.kafka_key.clone();

tokio::spawn(async move {
let producer_local = create_kafka_producer(&kafka_conf_clone).await;
let timeout = Duration::from_secs(30);
let retry_attempts = 3;
let retry_delay = Duration::from_millis(100);

if let Err(e) = send_to_kafka(
&producer_local,
&kafka_topic_clone,
&kafka_key_clone,
&json_data,
timeout,
retry_attempts,
retry_delay,
)
.await
{
log::error!("Failed to send message to Kafka: {:?}", e);
}
});
}
}
batch.clear();
Expand Down

0 comments on commit 2fecb02

Please sign in to comment.