Skip to content

Commit 342a2b6

Browse files
committed
concurrency: Generalize UnblockCallback to MachineCallback
* Introduce UnblockKind enum to represent operation outcomes * Consolidate unblock/timeout methods into single callback interface * Update thread blocking system to use new callback mechanism * Refactor mutex and condvar implementations for new callback pattern Signed-off-by: shamb0 <r.raajey@gmail.com>
1 parent 22fd2f3 commit 342a2b6

File tree

10 files changed

+186
-130
lines changed

10 files changed

+186
-130
lines changed

src/tools/miri/src/concurrency/sync.rs

Lines changed: 51 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,9 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
422422
mutex_ref: MutexRef,
423423
retval_dest: Option<(Scalar, MPlaceTy<'tcx>)>,
424424
}
425-
@unblock = |this| {
425+
|this, unblock: UnblockKind| {
426+
assert_eq!(unblock, UnblockKind::Ready);
427+
426428
assert!(!this.mutex_is_locked(&mutex_ref));
427429
this.mutex_lock(&mutex_ref);
428430

@@ -538,7 +540,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
538540
retval: Scalar,
539541
dest: MPlaceTy<'tcx>,
540542
}
541-
@unblock = |this| {
543+
|this, unblock: UnblockKind| {
544+
assert_eq!(unblock, UnblockKind::Ready);
542545
this.rwlock_reader_lock(id);
543546
this.write_scalar(retval, &dest)?;
544547
interp_ok(())
@@ -623,7 +626,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
623626
retval: Scalar,
624627
dest: MPlaceTy<'tcx>,
625628
}
626-
@unblock = |this| {
629+
|this, unblock: UnblockKind| {
630+
assert_eq!(unblock, UnblockKind::Ready);
627631
this.rwlock_writer_lock(id);
628632
this.write_scalar(retval, &dest)?;
629633
interp_ok(())
@@ -677,25 +681,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
677681
retval_timeout: Scalar,
678682
dest: MPlaceTy<'tcx>,
679683
}
680-
@unblock = |this| {
681-
// The condvar was signaled. Make sure we get the clock for that.
682-
if let Some(data_race) = &this.machine.data_race {
683-
data_race.acquire_clock(
684-
&this.machine.sync.condvars[condvar].clock,
685-
&this.machine.threads,
686-
);
684+
|this, unblock: UnblockKind| {
685+
match unblock {
686+
UnblockKind::Ready => {
687+
// The condvar was signaled. Make sure we get the clock for that.
688+
if let Some(data_race) = &this.machine.data_race {
689+
data_race.acquire_clock(
690+
&this.machine.sync.condvars[condvar].clock,
691+
&this.machine.threads,
692+
);
693+
}
694+
// Try to acquire the mutex.
695+
// The timeout only applies to the first wait (until the signal), not for mutex acquisition.
696+
this.condvar_reacquire_mutex(&mutex_ref, retval_succ, dest)
697+
}
698+
UnblockKind::TimedOut => {
699+
// We have to remove the waiter from the queue again.
700+
let thread = this.active_thread();
701+
let waiters = &mut this.machine.sync.condvars[condvar].waiters;
702+
waiters.retain(|waiter| *waiter != thread);
703+
// Now get back the lock.
704+
this.condvar_reacquire_mutex(&mutex_ref, retval_timeout, dest)
705+
}
687706
}
688-
// Try to acquire the mutex.
689-
// The timeout only applies to the first wait (until the signal), not for mutex acquisition.
690-
this.condvar_reacquire_mutex(&mutex_ref, retval_succ, dest)
691-
}
692-
@timeout = |this| {
693-
// We have to remove the waiter from the queue again.
694-
let thread = this.active_thread();
695-
let waiters = &mut this.machine.sync.condvars[condvar].waiters;
696-
waiters.retain(|waiter| *waiter != thread);
697-
// Now get back the lock.
698-
this.condvar_reacquire_mutex(&mutex_ref, retval_timeout, dest)
699707
}
700708
),
701709
);
@@ -752,25 +760,29 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
752760
dest: MPlaceTy<'tcx>,
753761
errno_timeout: IoError,
754762
}
755-
@unblock = |this| {
756-
let futex = futex_ref.0.borrow();
757-
// Acquire the clock of the futex.
758-
if let Some(data_race) = &this.machine.data_race {
759-
data_race.acquire_clock(&futex.clock, &this.machine.threads);
763+
|this, unblock: UnblockKind| {
764+
match unblock {
765+
UnblockKind::Ready => {
766+
let futex = futex_ref.0.borrow();
767+
// Acquire the clock of the futex.
768+
if let Some(data_race) = &this.machine.data_race {
769+
data_race.acquire_clock(&futex.clock, &this.machine.threads);
770+
}
771+
// Write the return value.
772+
this.write_scalar(retval_succ, &dest)?;
773+
interp_ok(())
774+
},
775+
UnblockKind::TimedOut => {
776+
// Remove the waiter from the futex.
777+
let thread = this.active_thread();
778+
let mut futex = futex_ref.0.borrow_mut();
779+
futex.waiters.retain(|waiter| waiter.thread != thread);
780+
// Set errno and write return value.
781+
this.set_last_error(errno_timeout)?;
782+
this.write_scalar(retval_timeout, &dest)?;
783+
interp_ok(())
784+
},
760785
}
761-
// Write the return value.
762-
this.write_scalar(retval_succ, &dest)?;
763-
interp_ok(())
764-
}
765-
@timeout = |this| {
766-
// Remove the waiter from the futex.
767-
let thread = this.active_thread();
768-
let mut futex = futex_ref.0.borrow_mut();
769-
futex.waiters.retain(|waiter| waiter.thread != thread);
770-
// Set errno and write return value.
771-
this.set_last_error(errno_timeout)?;
772-
this.write_scalar(retval_timeout, &dest)?;
773-
interp_ok(())
774786
}
775787
),
776788
);

src/tools/miri/src/concurrency/thread.rs

Lines changed: 13 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -38,71 +38,17 @@ pub enum TlsAllocAction {
3838
Leak,
3939
}
4040

41-
/// Trait for callbacks that are executed when a thread gets unblocked.
42-
pub trait UnblockCallback<'tcx>: VisitProvenance {
43-
/// Will be invoked when the thread was unblocked the "regular" way,
44-
/// i.e. whatever event it was blocking on has happened.
45-
fn unblock(self: Box<Self>, ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>) -> InterpResult<'tcx>;
46-
47-
/// Will be invoked when the timeout ellapsed without the event the
48-
/// thread was blocking on having occurred.
49-
fn timeout(self: Box<Self>, _ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>)
50-
-> InterpResult<'tcx>;
41+
/// The argument type for the "unblock" callback, indicating why the thread got unblocked.
42+
#[derive(Debug, PartialEq)]
43+
pub enum UnblockKind {
44+
/// Operation completed successfully, thread continues normal execution.
45+
Ready,
46+
/// The operation did not complete within its specified duration.
47+
TimedOut,
5148
}
52-
pub type DynUnblockCallback<'tcx> = Box<dyn UnblockCallback<'tcx> + 'tcx>;
53-
54-
#[macro_export]
55-
macro_rules! callback {
56-
(
57-
@capture<$tcx:lifetime $(,)? $($lft:lifetime),*> { $($name:ident: $type:ty),* $(,)? }
58-
@unblock = |$this:ident| $unblock:block
59-
) => {
60-
callback!(
61-
@capture<$tcx, $($lft),*> { $($name: $type),* }
62-
@unblock = |$this| $unblock
63-
@timeout = |_this| {
64-
unreachable!(
65-
"timeout on a thread that was blocked without a timeout (or someone forgot to overwrite this method)"
66-
)
67-
}
68-
)
69-
};
70-
(
71-
@capture<$tcx:lifetime $(,)? $($lft:lifetime),*> { $($name:ident: $type:ty),* $(,)? }
72-
@unblock = |$this:ident| $unblock:block
73-
@timeout = |$this_timeout:ident| $timeout:block
74-
) => {{
75-
struct Callback<$tcx, $($lft),*> {
76-
$($name: $type,)*
77-
_phantom: std::marker::PhantomData<&$tcx ()>,
78-
}
79-
80-
impl<$tcx, $($lft),*> VisitProvenance for Callback<$tcx, $($lft),*> {
81-
#[allow(unused_variables)]
82-
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
83-
$(
84-
self.$name.visit_provenance(visit);
85-
)*
86-
}
87-
}
8849

89-
impl<$tcx, $($lft),*> UnblockCallback<$tcx> for Callback<$tcx, $($lft),*> {
90-
fn unblock(self: Box<Self>, $this: &mut MiriInterpCx<$tcx>) -> InterpResult<$tcx> {
91-
#[allow(unused_variables)]
92-
let Callback { $($name,)* _phantom } = *self;
93-
$unblock
94-
}
95-
96-
fn timeout(self: Box<Self>, $this_timeout: &mut MiriInterpCx<$tcx>) -> InterpResult<$tcx> {
97-
#[allow(unused_variables)]
98-
let Callback { $($name,)* _phantom } = *self;
99-
$timeout
100-
}
101-
}
102-
103-
Box::new(Callback { $($name,)* _phantom: std::marker::PhantomData })
104-
}}
105-
}
50+
/// Type alias for unblock callbacks using UnblockKind argument.
51+
pub type DynUnblockCallback<'tcx> = DynMachineCallback<'tcx, UnblockKind>;
10652

10753
/// A thread identifier.
10854
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)]
@@ -656,7 +602,8 @@ impl<'tcx> ThreadManager<'tcx> {
656602
@capture<'tcx> {
657603
joined_thread_id: ThreadId,
658604
}
659-
@unblock = |this| {
605+
|this, unblock: UnblockKind| {
606+
assert_eq!(unblock, UnblockKind::Ready);
660607
if let Some(data_race) = &mut this.machine.data_race {
661608
data_race.thread_joined(&this.machine.threads, joined_thread_id);
662609
}
@@ -842,7 +789,7 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
842789
// 2. Make the scheduler the only place that can change the active
843790
// thread.
844791
let old_thread = this.machine.threads.set_active_thread_id(thread);
845-
callback.timeout(this)?;
792+
callback.call(this, UnblockKind::TimedOut)?;
846793
this.machine.threads.set_active_thread_id(old_thread);
847794
}
848795
// found_callback can remain None if the computer's clock
@@ -1084,7 +1031,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
10841031
};
10851032
// The callback must be executed in the previously blocked thread.
10861033
let old_thread = this.machine.threads.set_active_thread_id(thread);
1087-
callback.unblock(this)?;
1034+
callback.call(this, UnblockKind::Ready)?;
10881035
this.machine.threads.set_active_thread_id(old_thread);
10891036
interp_ok(())
10901037
}

src/tools/miri/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ pub use crate::concurrency::sync::{
128128
CondvarId, EvalContextExt as _, MutexRef, RwLockId, SynchronizationObjects,
129129
};
130130
pub use crate::concurrency::thread::{
131-
BlockReason, EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager, TimeoutAnchor,
132-
TimeoutClock, UnblockCallback,
131+
BlockReason, DynUnblockCallback, EvalContextExt as _, StackEmptyCallback, ThreadId,
132+
ThreadManager, TimeoutAnchor, TimeoutClock, UnblockKind,
133133
};
134134
pub use crate::diagnostics::{
135135
EvalContextExt as _, NonHaltingDiagnostic, TerminationInfo, report_error,
@@ -141,8 +141,8 @@ pub use crate::eval::{
141141
pub use crate::helpers::{AccessKind, EvalContextExt as _};
142142
pub use crate::intrinsics::EvalContextExt as _;
143143
pub use crate::machine::{
144-
AllocExtra, FrameExtra, MemoryKind, MiriInterpCx, MiriInterpCxExt, MiriMachine, MiriMemoryKind,
145-
PrimitiveLayouts, Provenance, ProvenanceExtra,
144+
AllocExtra, DynMachineCallback, FrameExtra, MachineCallback, MemoryKind, MiriInterpCx,
145+
MiriInterpCxExt, MiriMachine, MiriMemoryKind, PrimitiveLayouts, Provenance, ProvenanceExtra,
146146
};
147147
pub use crate::mono_hash_map::MonoHashMap;
148148
pub use crate::operator::EvalContextExt as _;

src/tools/miri/src/machine.rs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1723,3 +1723,87 @@ impl<'tcx> Machine<'tcx> for MiriMachine<'tcx> {
17231723
Cow::Borrowed(ecx.machine.union_data_ranges.entry(ty).or_insert_with(compute_range))
17241724
}
17251725
}
1726+
1727+
/// Trait for callbacks handling asynchronous machine operations.
1728+
pub trait MachineCallback<'tcx, T>: VisitProvenance {
1729+
/// The function to be invoked when the callback is fired.
1730+
fn call(
1731+
self: Box<Self>,
1732+
ecx: &mut InterpCx<'tcx, MiriMachine<'tcx>>,
1733+
arg: T,
1734+
) -> InterpResult<'tcx>;
1735+
}
1736+
1737+
/// Type alias for boxed machine callbacks with generic argument type.
1738+
pub type DynMachineCallback<'tcx, T> = Box<dyn MachineCallback<'tcx, T> + 'tcx>;
1739+
1740+
/// Creates a callback for blocking operations with captured state.
1741+
///
1742+
/// When a thread blocks on a resource (as defined in `enum BlockReason`), this callback
1743+
/// executes once that resource becomes available. The callback captures needed
1744+
/// variables and handles the completion of the blocking operation.
1745+
///
1746+
/// # Example
1747+
/// ```rust
1748+
/// // Block thread until mutex is available
1749+
/// this.block_thread(
1750+
/// BlockReason::Mutex,
1751+
/// None,
1752+
/// callback!(
1753+
/// @capture<'tcx> {
1754+
/// mutex_ref: MutexRef,
1755+
/// retval: Scalar,
1756+
/// dest: MPlaceTy<'tcx>,
1757+
/// }
1758+
/// |this, unblock: UnblockKind| {
1759+
/// // Verify successful mutex acquisition
1760+
/// assert_eq!(unblock, UnblockKind::Ready);
1761+
///
1762+
/// // Enter critical section
1763+
/// this.mutex_lock(&mutex_ref);
1764+
///
1765+
/// // Process protected data and store result
1766+
/// this.write_scalar(retval, &dest)?;
1767+
///
1768+
/// // Exit critical section implicitly when callback completes
1769+
/// interp_ok(())
1770+
/// }
1771+
/// ),
1772+
/// );
1773+
/// ```
1774+
#[macro_export]
1775+
macro_rules! callback {
1776+
(@capture<$tcx:lifetime $(,)? $($lft:lifetime),*>
1777+
{ $($name:ident: $type:ty),* $(,)? }
1778+
|$this:ident, $arg:ident: $arg_ty:ty| $body:expr $(,)?) => {{
1779+
struct Callback<$tcx, $($lft),*> {
1780+
$($name: $type,)*
1781+
_phantom: std::marker::PhantomData<&$tcx ()>,
1782+
}
1783+
1784+
impl<$tcx, $($lft),*> VisitProvenance for Callback<$tcx, $($lft),*> {
1785+
fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {
1786+
$(
1787+
self.$name.visit_provenance(_visit);
1788+
)*
1789+
}
1790+
}
1791+
1792+
impl<$tcx, $($lft),*> MachineCallback<$tcx, $arg_ty> for Callback<$tcx, $($lft),*> {
1793+
fn call(
1794+
self: Box<Self>,
1795+
$this: &mut MiriInterpCx<$tcx>,
1796+
$arg: $arg_ty
1797+
) -> InterpResult<$tcx> {
1798+
#[allow(unused_variables)]
1799+
let Callback { $($name,)* _phantom } = *self;
1800+
$body
1801+
}
1802+
}
1803+
1804+
Box::new(Callback {
1805+
$($name,)*
1806+
_phantom: std::marker::PhantomData
1807+
})
1808+
}};
1809+
}

src/tools/miri/src/shims/time.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -331,8 +331,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
331331
Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration)),
332332
callback!(
333333
@capture<'tcx> {}
334-
@unblock = |_this| { panic!("sleeping thread unblocked before time is up") }
335-
@timeout = |_this| { interp_ok(()) }
334+
|_this, unblock: UnblockKind| {
335+
assert_eq!(unblock, UnblockKind::TimedOut);
336+
interp_ok(())
337+
}
336338
),
337339
);
338340
interp_ok(Scalar::from_i32(0))
@@ -353,8 +355,10 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
353355
Some((TimeoutClock::Monotonic, TimeoutAnchor::Relative, duration)),
354356
callback!(
355357
@capture<'tcx> {}
356-
@unblock = |_this| { panic!("sleeping thread unblocked before time is up") }
357-
@timeout = |_this| { interp_ok(()) }
358+
|_this, unblock: UnblockKind| {
359+
assert_eq!(unblock, UnblockKind::TimedOut);
360+
interp_ok(())
361+
}
358362
),
359363
);
360364
interp_ok(())

0 commit comments

Comments
 (0)