Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

json stats with added iat/bitrate min/max/avg stats #26

Merged
merged 4 commits into from
Dec 13, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 152 additions & 21 deletions src/bin/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ struct StreamData {
continuity_counter: u8,
timestamp: u64,
bitrate: u32,
bitrate_max: u32,
bitrate_min: u32,
bitrate_avg: u32,
iat: u64,
iat_max: u64,
iat_min: u64,
iat_avg: u64,
error_count: u32,
last_arrival_time: u64,
start_time: u64, // field for start time
Expand All @@ -77,7 +83,13 @@ impl StreamData {
continuity_counter: u8,
) -> Self {
let bitrate = 0;
let bitrate_max = 0;
let bitrate_min = 0;
let bitrate_avg = 0;
let iat = 0;
let iat_max = 0;
let iat_min = 0;
let iat_avg = 0;
let error_count = 0;
let last_arrival_time = current_unix_timestamp_ms().unwrap_or(0);
StreamData {
Expand All @@ -86,7 +98,13 @@ impl StreamData {
continuity_counter,
timestamp,
bitrate,
bitrate_max,
bitrate_min,
bitrate_avg,
iat,
iat_max,
iat_min,
iat_avg,
error_count,
last_arrival_time,
start_time, // Initialize start time
Expand All @@ -103,6 +121,19 @@ impl StreamData {
if elapsed_time_ms > 0 {
let elapsed_time_sec = elapsed_time_ms as f64 / 1000.0;
self.bitrate = (self.total_bits as f64 / elapsed_time_sec) as u32;

// Bitrate max
if self.bitrate > self.bitrate_max {
self.bitrate_max = self.bitrate;
}

// Bitrate min
if self.bitrate < self.bitrate_min {
self.bitrate_min = self.bitrate;
}

// Bitrate avg
self.bitrate_avg = (self.bitrate_avg + self.bitrate) / 2;
}

self.total_bits += bits; // Accumulate total bits
Expand All @@ -111,6 +142,19 @@ impl StreamData {
let iat = arrival_time - self.last_arrival_time;
self.iat = iat;

// IAT max
if iat > self.iat_max {
self.iat_max = iat;
}

// IAT min
if iat < self.iat_min {
self.iat_min = iat;
}

// IAT avg
self.iat_avg = (self.iat_avg + iat) / 2;

self.last_arrival_time = arrival_time;
}
}
Expand Down Expand Up @@ -177,25 +221,50 @@ fn process_packet(stream_data_packet: &StreamData, errors: &mut Tr101290Errors)
// Use a scope to limit the duration of the lock
{
let mut pid_map = PID_MAP.lock().unwrap();
let mut found_pid = false;

// Check if the PID map already has an entry for this PID
match pid_map.get_mut(&pid) {
Some(stream_data) => {
// Existing StreamData instance found, update it
stream_data.update_stats(packet.len(), arrival_time);
let uptime = arrival_time - stream_data.start_time;
// print stats
debug!("STATS: PID: {}, Type: {}, Bitrate: {} bps, IAT: {} ms, Errors: {}, CC: {}, Timestamp: {} ms Uptime: {} ms",
stream_data.pid, stream_data.stream_type, stream_data.bitrate, stream_data_packet.iat,
stream_data.error_count, stream_data_packet.continuity_counter, stream_data_packet.timestamp, uptime);

// create json object of stats
let json_stats = json!({
"type": "mpegts_stats",
"pid": stream_data.pid,
"stream_type": stream_data.stream_type,
"bitrate": stream_data.bitrate,
"bitrate_max": stream_data.bitrate_max,
"bitrate_min": stream_data.bitrate_min,
"bitrate_avg": stream_data.bitrate_avg,
"iat": stream_data_packet.iat,
"iat_max": stream_data.iat_max,
"iat_min": stream_data.iat_min,
"iat_avg": stream_data.iat_avg,
"errors": stream_data.error_count,
"continuity_counter": stream_data_packet.continuity_counter,
"timestamp": stream_data_packet.timestamp,
"uptime": uptime,
});

// log json stats
debug!("STATS: {}", json_stats);
found_pid = true;
}
None => {
// New StreamData instance needs to be created
//let stream_type = determine_stream_type(pid); // Determine stream type
//let new_stream_data = StreamData::new(packet, pid, stream_type, arrival_time, arrival_time, 0);
//pid_map.insert(pid, new_stream_data);
// No StreamData instance found, log an error
error!("ProcessPacket: PID {} not found in PID map.", pid);
}
}
if !found_pid && pid == 0x1FFF {
// PID not found, add the stream_data_packet to the pid_map
pid_map.insert(
pid,
stream_data_packet.clone(),
);
}
}
}

Expand Down Expand Up @@ -405,19 +474,75 @@ fn update_pid_map(pmt_packet: &[u8]) {

let timestamp = current_unix_timestamp_ms().unwrap_or(0);

debug!(
"UpdatePIDmap: Added Stream PID: {}, Stream Type: {}/{}",
stream_pid, pmt_entry.stream_type, stream_type
);
let stream_data = StreamData::new(
&[],
stream_pid,
stream_type.to_string(),
timestamp,
timestamp,
0,
);
pid_map.insert(stream_pid, stream_data);
if !pid_map.contains_key(&stream_pid) {
debug!(
"UpdatePIDmap: Added Stream PID: {}, Stream Type: {}/{}",
stream_pid, pmt_entry.stream_type, stream_type
);
let stream_data = StreamData::new(
&[],
stream_pid,
stream_type.to_string(),
timestamp,
timestamp,
0,
);

// create json object of stats
let json_stats = json!({
"type": "mpegts_stats",
"pid": stream_data.pid,
"stream_type": stream_data.stream_type,
"bitrate": stream_data.bitrate,
"bitrate_max": stream_data.bitrate_max,
"bitrate_min": stream_data.bitrate_min,
"bitrate_avg": stream_data.bitrate_avg,
"iat": stream_data.iat,
"iat_max": stream_data.iat_max,
"iat_min": stream_data.iat_min,
"iat_avg": stream_data.iat_avg,
"errors": stream_data.error_count,
"continuity_counter": stream_data.continuity_counter,
"timestamp": stream_data.timestamp,
"uptime": 0,
});

// log json stats
debug!("STATS: {}", json_stats);

pid_map.insert(stream_pid, stream_data);
} else {
debug!(
"UpdatePIDmap: PID {} already exists, not adding again.",
stream_pid
);
// get the stream data so we can update it
let stream_data = pid_map.get_mut(&stream_pid).unwrap();
// update the timestamp
stream_data.update_stats(pmt_packet.len(), timestamp);

// create json object of stats
let json_stats = json!({
"type": "mpegts_stats",
"pid": stream_data.pid,
"stream_type": stream_data.stream_type,
"bitrate": stream_data.bitrate,
"bitrate_max": stream_data.bitrate_max,
"bitrate_min": stream_data.bitrate_min,
"bitrate_avg": stream_data.bitrate_avg,
"iat": stream_data.iat,
"iat_max": stream_data.iat_max,
"iat_min": stream_data.iat_min,
"iat_avg": stream_data.iat_avg,
"errors": stream_data.error_count,
"continuity_counter": stream_data.continuity_counter,
"timestamp": stream_data.timestamp,
"uptime": 0,
});

// log json stats
debug!("STATS: {}", json_stats);
}
}
} else {
error!("UpdatePIDmap: Skipping PMT PID: {} as it does not match with current PMT packet PID", pmt_pid);
Expand All @@ -427,6 +552,12 @@ fn update_pid_map(pmt_packet: &[u8]) {

fn determine_stream_type(pid: u16) -> String {
let pid_map = PID_MAP.lock().unwrap();

// check if pid already is mapped, if so return the stream type already stored
if let Some(stream_data) = pid_map.get(&pid) {
return stream_data.stream_type.clone();
}

pid_map
.get(&pid)
.map(|stream_data| stream_data.stream_type.clone())
Expand Down