Skip to content

Commit dbf93c7

Browse files
authored
sync: fix incorrect is_empty on mpsc block boundaries (#6603)
1 parent 873cb8a commit dbf93c7

File tree

2 files changed

+21
-6
lines changed

2 files changed

+21
-6
lines changed

tokio/src/sync/mpsc/block.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -168,14 +168,17 @@ impl<T> Block<T> {
168168
Some(Read::Value(value.assume_init()))
169169
}
170170

171-
/// Returns true if there is a value in the slot to be consumed
171+
/// Returns true if *this* block has a value in the given slot.
172172
///
173-
/// # Safety
174-
///
175-
/// To maintain safety, the caller must ensure:
176-
///
177-
/// * No concurrent access to the slot.
173+
/// Always returns false when given an index from a different block.
178174
pub(crate) fn has_value(&self, slot_index: usize) -> bool {
175+
if slot_index < self.header.start_index {
176+
return false;
177+
}
178+
if slot_index >= self.header.start_index + super::BLOCK_CAP {
179+
return false;
180+
}
181+
179182
let offset = offset(slot_index);
180183
let ready_bits = self.header.ready_slots.load(Acquire);
181184
is_ready(ready_bits, offset)

tokio/tests/sync_mpsc.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,4 +1421,16 @@ async fn test_rx_unbounded_len_when_close_is_called_after_dropping_sender() {
14211421
assert_eq!(rx.len(), 1);
14221422
}
14231423

1424+
// Regression test for https://github.com/tokio-rs/tokio/issues/6602
1425+
#[tokio::test]
1426+
async fn test_is_empty_32_msgs() {
1427+
let (sender, mut receiver) = mpsc::channel(33);
1428+
1429+
for value in 1..257 {
1430+
sender.send(value).await.unwrap();
1431+
receiver.recv().await.unwrap();
1432+
assert!(receiver.is_empty(), "{value}. len: {}", receiver.len());
1433+
}
1434+
}
1435+
14241436
fn is_debug<T: fmt::Debug>(_: &T) {}

0 commit comments

Comments
 (0)