Skip to content

Commit

Permalink
concurrency: Generalize UnblockCallback to MachineCallback
Browse files Browse the repository at this point in the history
    * Introduce UnblockKind enum to represent operation outcomes
    * Consolidate unblock/timeout methods into single callback interface
    * Update thread blocking system to use new callback mechanism
    * Refactor mutex and condvar implementations for new callback pattern

Signed-off-by: shamb0 <[email protected]>
  • Loading branch information
shamb0 committed Dec 29, 2024
1 parent a2465fa commit 42ef1d7
Show file tree
Hide file tree
Showing 10 changed files with 186 additions and 130 deletions.
90 changes: 51 additions & 39 deletions src/concurrency/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,9 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
mutex_ref: MutexRef,
retval_dest: Option<(Scalar, MPlaceTy<'tcx>)>,
}
@unblock = |this| {
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);

assert!(!this.mutex_is_locked(&mutex_ref));
this.mutex_lock(&mutex_ref);

Expand Down Expand Up @@ -538,7 +540,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
retval: Scalar,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
this.rwlock_reader_lock(id);
this.write_scalar(retval, &dest)?;
interp_ok(())
Expand Down Expand Up @@ -623,7 +626,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
retval: Scalar,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
this.rwlock_writer_lock(id);
this.write_scalar(retval, &dest)?;
interp_ok(())
Expand Down Expand Up @@ -677,25 +681,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
retval_timeout: Scalar,
dest: MPlaceTy<'tcx>,
}
@unblock = |this| {
// The condvar was signaled. Make sure we get the clock for that.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(
&this.machine.sync.condvars[condvar].clock,
&this.machine.threads,
);
|this, unblock: UnblockKind| {
match unblock {
UnblockKind::Ready => {
// The condvar was signaled. Make sure we get the clock for that.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(
&this.machine.sync.condvars[condvar].clock,
&this.machine.threads,
);
}
// Try to acquire the mutex.
// The timeout only applies to the first wait (until the signal), not for mutex acquisition.
this.condvar_reacquire_mutex(&mutex_ref, retval_succ, dest)
}
UnblockKind::TimedOut => {
// We have to remove the waiter from the queue again.
let thread = this.active_thread();
let waiters = &mut this.machine.sync.condvars[condvar].waiters;
waiters.retain(|waiter| *waiter != thread);
// Now get back the lock.
this.condvar_reacquire_mutex(&mutex_ref, retval_timeout, dest)
}
}
// Try to acquire the mutex.
// The timeout only applies to the first wait (until the signal), not for mutex acquisition.
this.condvar_reacquire_mutex(&mutex_ref, retval_succ, dest)
}
@timeout = |this| {
// We have to remove the waiter from the queue again.
let thread = this.active_thread();
let waiters = &mut this.machine.sync.condvars[condvar].waiters;
waiters.retain(|waiter| *waiter != thread);
// Now get back the lock.
this.condvar_reacquire_mutex(&mutex_ref, retval_timeout, dest)
}
),
);
Expand Down Expand Up @@ -752,25 +760,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
dest: MPlaceTy<'tcx>,
errno_timeout: IoError,
}
@unblock = |this| {
let futex = futex_ref.0.borrow();
// Acquire the clock of the futex.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&futex.clock, &this.machine.threads);
|this, unblock: UnblockKind| {
match unblock {
UnblockKind::Ready => {
let futex = futex_ref.0.borrow();
// Acquire the clock of the futex.
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&futex.clock, &this.machine.threads);
}
// Write the return value.
this.write_scalar(retval_succ, &dest)?;
interp_ok(())
},
UnblockKind::TimedOut => {
// Remove the waiter from the futex.
let thread = this.active_thread();
let mut futex = futex_ref.0.borrow_mut();
futex.waiters.retain(|waiter| waiter.thread != thread);
// Set errno and write return value.
this.set_last_error(errno_timeout)?;
this.write_scalar(retval_timeout, &dest)?;
interp_ok(())
},
}
// Write the return value.
this.write_scalar(retval_succ, &dest)?;
interp_ok(())
}
@timeout = |this| {
// Remove the waiter from the futex.
let thread = this.active_thread();
let mut futex = futex_ref.0.borrow_mut();
futex.waiters.retain(|waiter| waiter.thread != thread);
// Set errno and write return value.
this.set_last_error(errno_timeout)?;
this.write_scalar(retval_timeout, &dest)?;
interp_ok(())
}
),
);
Expand Down
79 changes: 13 additions & 66 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,71 +38,17 @@ pub enum TlsAllocAction {
Leak,
}

/// Trait for callbacks that are executed when a thread gets unblocked.
pub trait UnblockCallback<'tcx>: VisitProvenance {
/// Will be invoked when the thread was unblocked the "regular" way,
/// i.e. whatever event it was blocking on has happened.
fn unblock(self: Box<Self>, ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>) -> InterpResult<'tcx>;

/// Will be invoked when the timeout ellapsed without the event the
/// thread was blocking on having occurred.
fn timeout(self: Box<Self>, _ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>)
-> InterpResult<'tcx>;
/// The argument type for the "unblock" callback, indicating why the thread got unblocked.
#[derive(Debug, PartialEq)]
pub enum UnblockKind {
/// Operation completed successfully, thread continues normal execution.
Ready,
/// The operation did not complete within its specified duration.
TimedOut,
}
pub type DynUnblockCallback<'tcx> = Box<dyn UnblockCallback<'tcx> + 'tcx>;

#[macro_export]
macro_rules! callback {
(
@capture<$tcx:lifetime $(,)? $($lft:lifetime),*> { $($name:ident: $type:ty),* $(,)? }
@unblock = |$this:ident| $unblock:block
) => {
callback!(
@capture<$tcx, $($lft),*> { $($name: $type),* }
@unblock = |$this| $unblock
@timeout = |_this| {
unreachable!(
"timeout on a thread that was blocked without a timeout (or someone forgot to overwrite this method)"
)
}
)
};
(
@capture<$tcx:lifetime $(,)? $($lft:lifetime),*> { $($name:ident: $type:ty),* $(,)? }
@unblock = |$this:ident| $unblock:block
@timeout = |$this_timeout:ident| $timeout:block
) => {{
struct Callback<$tcx, $($lft),*> {
$($name: $type,)*
_phantom: std::marker::PhantomData<&$tcx ()>,
}

impl<$tcx, $($lft),*> VisitProvenance for Callback<$tcx, $($lft),*> {
#[allow(unused_variables)]
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
$(
self.$name.visit_provenance(visit);
)*
}
}

impl<$tcx, $($lft),*> UnblockCallback<$tcx> for Callback<$tcx, $($lft),*> {
fn unblock(self: Box<Self>, $this: &mut MiriInterpCx<$tcx>) -> InterpResult<$tcx> {
#[allow(unused_variables)]
let Callback { $($name,)* _phantom } = *self;
$unblock
}

fn timeout(self: Box<Self>, $this_timeout: &mut MiriInterpCx<$tcx>) -> InterpResult<$tcx> {
#[allow(unused_variables)]
let Callback { $($name,)* _phantom } = *self;
$timeout
}
}

Box::new(Callback { $($name,)* _phantom: std::marker::PhantomData })
}}
}
/// Type alias for unblock callbacks using UnblockKind argument.
pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;

/// A thread identifier.
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
Expand Down Expand Up @@ -656,7 +602,8 @@ impl<'tcx> ThreadManager<'tcx> {
@capture<'tcx> {
joined_thread_id: ThreadId,
}
@unblock = |this| {
|this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::Ready);
if let Some(data_race) = &mut this.machine.data_race {
data_race.thread_joined(&this.machine.threads, joined_thread_id);
}
Expand Down Expand Up @@ -842,7 +789,7 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
// 2. Make the scheduler the only place that can change the active
// thread.
let old_thread = this.machine.threads.set_active_thread_id(thread);
callback.timeout(this)?;
callback.call(this, UnblockKind::TimedOut)?;
this.machine.threads.set_active_thread_id(old_thread);
}
// found_callback can remain None if the computer's clock
Expand Down Expand Up @@ -1084,7 +1031,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
};
// The callback must be executed in the previously blocked thread.
let old_thread = this.machine.threads.set_active_thread_id(thread);
callback.unblock(this)?;
callback.call(this, UnblockKind::Ready)?;
this.machine.threads.set_active_thread_id(old_thread);
interp_ok(())
}
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ pub use crate::concurrency::sync::{
CondvarId, EvalContextExt as _, MutexRef, RwLockId, SynchronizationObjects,
};
pub use crate::concurrency::thread::{
BlockReason, EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager, TimeoutAnchor,
TimeoutClock, UnblockCallback,
BlockReason, DynUnblockCallback, EvalContextExt as _, StackEmptyCallback, ThreadId,
ThreadManager, TimeoutAnchor, TimeoutClock, UnblockKind,
};
pub use crate::diagnostics::{
EvalContextExt as _, NonHaltingDiagnostic, TerminationInfo, report_error,
Expand All @@ -141,8 +141,8 @@ pub use crate::eval::{
pub use crate::helpers::{AccessKind, EvalContextExt as _};
pub use crate::intrinsics::EvalContextExt as _;
pub use crate::machine::{
AllocExtra, FrameExtra, MemoryKind, MiriInterpCx, MiriInterpCxExt, MiriMachine, MiriMemoryKind,
PrimitiveLayouts, Provenance, ProvenanceExtra,
AllocExtra, DynMachineCallback, FrameExtra, MachineCallback, MemoryKind, MiriInterpCx,
MiriInterpCxExt, MiriMachine, MiriMemoryKind, PrimitiveLayouts, Provenance, ProvenanceExtra,
};
pub use crate::mono_hash_map::MonoHashMap;
pub use crate::operator::EvalContextExt as _;
Expand Down
84 changes: 84 additions & 0 deletions src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1723,3 +1723,87 @@ impl<'tcx> Machine<'tcx> for MiriMachine<'tcx> {
Cow::Borrowed(ecx.machine.union_data_ranges.entry(ty).or_insert_with(compute_range))
}
}

/// Trait for callbacks handling asynchronous machine operations.
pub trait MachineCallback<'tcx, T>: VisitProvenance {
/// The function to be invoked when the callback is fired.
fn call(
self: Box<Self>,
ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
arg: T,
) -> InterpResult<'tcx>;
}

/// Type alias for boxed machine callbacks with generic argument type.
pub type DynMachineCallback<'tcx, T> = Box<dyn MachineCallback<'tcx, T> + 'tcx>;

/// Creates a callback for blocking operations with captured state.
///
/// When a thread blocks on a resource (as defined in `enum BlockReason`), this callback
/// executes once that resource becomes available. The callback captures needed
/// variables and handles the completion of the blocking operation.
///
/// # Example
/// ```rust
/// // Block thread until mutex is available
/// this.block_thread(
/// BlockReason::Mutex,
/// None,
/// callback!(
/// @capture<'tcx> {
/// mutex_ref: MutexRef,
/// retval: Scalar,
/// dest: MPlaceTy<'tcx>,
/// }
/// |this, unblock: UnblockKind| {
/// // Verify successful mutex acquisition
/// assert_eq!(unblock, UnblockKind::Ready);
///
/// // Enter critical section
/// this.mutex_lock(&mutex_ref);
///
/// // Process protected data and store result
/// this.write_scalar(retval, &dest)?;
///
/// // Exit critical section implicitly when callback completes
/// interp_ok(())
/// }
/// ),
/// );
/// ```
#[macro_export]
macro_rules! callback {
(@capture<$tcx:lifetime $(,)? $($lft:lifetime),*>
{ $($name:ident: $type:ty),* $(,)? }
|$this:ident, $arg:ident: $arg_ty:ty| $body:expr $(,)?) => {{
struct Callback<$tcx, $($lft),*> {
$($name: $type,)*
_phantom: std::marker::PhantomData<&$tcx ()>,
}

impl<$tcx, $($lft),*> VisitProvenance for Callback<$tcx, $($lft),*> {
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
$(
self.$name.visit_provenance(_visit);
)*
}
}

impl<$tcx, $($lft),*> MachineCallback<$tcx, $arg_ty> for Callback<$tcx, $($lft),*> {
fn call(
self: Box<Self>,
$this: &mut MiriInterpCx<$tcx>,
$arg: $arg_ty
) -> InterpResult<$tcx> {
#[allow(unused_variables)]
let Callback { $($name,)* _phantom } = *self;
$body
}
}

Box::new(Callback {
$($name,)*
_phantom: std::marker::PhantomData
})
}};
}
12 changes: 8 additions & 4 deletions src/shims/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration)),
callback!(
@capture<'tcx> {}
@unblock = |_this| { panic!("sleeping thread unblocked before time is up") }
@timeout = |_this| { interp_ok(()) }
|_this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::TimedOut);
interp_ok(())
}
),
);
interp_ok(Scalar::from_i32(0))
Expand All @@ -353,8 +355,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration)),
callback!(
@capture<'tcx> {}
@unblock = |_this| { panic!("sleeping thread unblocked before time is up") }
@timeout = |_this| { interp_ok(()) }
|_this, unblock: UnblockKind| {
assert_eq!(unblock, UnblockKind::TimedOut);
interp_ok(())
}
),
);
interp_ok(())
Expand Down
Loading

0 comments on commit 42ef1d7

Please sign in to comment.