Skip to content

Commit

Permalink
Add a tracing warning when a thread blocks steps
Browse files Browse the repository at this point in the history
Add a warning to the sim when a given host or client blocks progress in
a simulation run. This works by spawning a background thread for each
run that periodically checks the steps taken by the simulation. If the
number of steps is the same between checks then the thread adds the
tracing info.
  • Loading branch information
Benjscho committed Jan 9, 2024
1 parent 6e59ce7 commit e890f25
Showing 1 changed file with 32 additions and 6 deletions.
38 changes: 32 additions & 6 deletions src/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::cell::RefCell;
use std::future::Future;
use std::net::IpAddr;
use std::ops::DerefMut;
use std::sync::Arc;
use std::sync::mpsc::TryRecvError;
use std::sync::{Arc, mpsc};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::time::UNIX_EPOCH;
use tokio::time::Duration;
use tracing::Level;
Expand All @@ -30,7 +33,7 @@ pub struct Sim<'a> {
/// Simulation elapsed time
elapsed: Duration,

steps: usize,
steps: Arc<AtomicUsize>,
}

impl<'a> Sim<'a> {
Expand All @@ -46,7 +49,7 @@ impl<'a> Sim<'a> {
rts: IndexMap::new(),
since_epoch,
elapsed: Duration::ZERO,
steps: 1, // bumped after each step
steps: Arc::new(1.into()), // bumped after each step
}
}

Expand Down Expand Up @@ -309,10 +312,33 @@ impl<'a> Sim<'a> {
/// Executes a simple event loop that calls [step](#method.step) each iteration,
/// returning early if any host software errors.
pub fn run(&mut self) -> Result {
let steps = self.steps.clone();
let (tx, rx) = mpsc::channel();
std::thread::spawn(move || {
let mut blocked = false;
loop {
let prev = steps.load(std::sync::atomic::Ordering::Relaxed);
// Exit if main thread has.
match rx.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => break,
_ => {}
}
std::thread::sleep(Duration::from_secs(10));
if steps.load(std::sync::atomic::Ordering::Relaxed) == prev {
if !blocked {
tracing::warn!("A task is blocking preventing simulation steps at step {}.", prev);
}
blocked = true;
} else {
blocked = false;
}
}
});
loop {
let is_finished = self.step()?;

if is_finished {
let _ = tx.send(());
return Ok(());
}
}
Expand All @@ -328,7 +354,7 @@ impl<'a> Sim<'a> {
///
/// Returns whether or not all clients have completed.
pub fn step(&mut self) -> Result<bool> {
tracing::debug!("step {}", self.steps);
tracing::debug!("step {}", self.steps.load(Relaxed));

let tick = self.config.tick;
let mut is_finished = true;
Expand Down Expand Up @@ -380,12 +406,12 @@ impl<'a> Sim<'a> {
}

self.elapsed += tick;
self.steps += 1;
let steps = self.steps.fetch_add(1, Relaxed) + 1;

if self.elapsed > self.config.duration && !is_finished {
return Err(format!(
"Ran for duration: {:?} steps: {} without completing",
self.config.duration, self.steps,
self.config.duration, steps,
))?;
}

Expand Down

0 comments on commit e890f25

Please sign in to comment.