Skip to content

Commit adbb5b4

Browse files
committed
Extract a base runBlocking liveness test
1 parent de06466 commit adbb5b4

File tree

2 files changed

+51
-44
lines changed

2 files changed

+51
-44
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package scheduling
2+
3+
import kotlinx.coroutines.*
4+
import kotlinx.coroutines.scheduling.*
5+
import kotlinx.coroutines.testing.*
6+
import java.util.*
7+
import java.util.concurrent.*
8+
9+
open class RunBlockingCoroutineSchedulerLivenessTestBase : SchedulerTestBase() {
10+
protected fun testSchedulerLiveness(targetDispatcher: CoroutineDispatcher, yieldMask: Int = 0b1111): Unit = runBlocking {
11+
val oldRunBlockings = LinkedList<Job>()
12+
var maxOldRunBlockings = 0
13+
var busyWaits = 0
14+
repeat(5000 * stressTestMultiplierSqrt) {
15+
if (it % 1000 == 0) {
16+
System.err.println("======== $it, rb=${oldRunBlockings.size}, max rb=${maxOldRunBlockings}, busy=$busyWaits")
17+
}
18+
val barrier = CyclicBarrier(2)
19+
val barrier2 = CompletableDeferred<Unit>()
20+
val blocking = launch(targetDispatcher) {
21+
barrier.await()
22+
runBlocking {
23+
if ((yieldMask and 1) != 0) yield()
24+
barrier2.await()
25+
if ((yieldMask and 2) != 0) yield()
26+
}
27+
}
28+
oldRunBlockings.addLast(blocking)
29+
val task = async(targetDispatcher) {
30+
if ((yieldMask and 4) != 0) yield()
31+
42.also {
32+
if ((yieldMask and 8) != 0) yield()
33+
}
34+
}
35+
barrier.await()
36+
task.join()
37+
barrier2.complete(Unit)
38+
39+
oldRunBlockings.removeIf(Job::isCompleted)
40+
while (oldRunBlockings.size > 5) {
41+
busyWaits++
42+
oldRunBlockings.removeIf(Job::isCompleted)
43+
}
44+
if (oldRunBlockings.size > maxOldRunBlockings) {
45+
maxOldRunBlockings = oldRunBlockings.size
46+
}
47+
}
48+
}
49+
}
Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,12 @@
11
package scheduling
22

3-
import kotlinx.coroutines.*
4-
import kotlinx.coroutines.scheduling.*
53
import kotlinx.coroutines.testing.*
64
import org.junit.*
75
import org.junit.runner.*
86
import org.junit.runners.*
9-
import java.util.*
10-
import java.util.concurrent.*
117

128
@RunWith(Parameterized::class)
13-
class RunBlockingDefaultDispatcherLivenessStressTest(private val yieldMask: Int) : SchedulerTestBase() {
9+
class RunBlockingDefaultDispatcherLivenessStressTest(private val yieldMask: Int) : RunBlockingCoroutineSchedulerLivenessTestBase() {
1410
init {
1511
corePoolSize = 1
1612
}
@@ -24,43 +20,5 @@ class RunBlockingDefaultDispatcherLivenessStressTest(private val yieldMask: Int)
2420
}
2521

2622
@Test
27-
fun testLivenessOfDefaultDispatcher(): Unit = runBlocking {
28-
val oldRunBlockings = LinkedList<Job>()
29-
var maxOldRunBlockings = 0
30-
var busyWaits = 0
31-
repeat(5000 * stressTestMultiplierSqrt) {
32-
if (it % 1000 == 0) {
33-
System.err.println("======== $it, rb=${oldRunBlockings.size}, max rb=${maxOldRunBlockings}, busy=$busyWaits")
34-
}
35-
val barrier = CyclicBarrier(2)
36-
val barrier2 = CompletableDeferred<Unit>()
37-
val blocking = launch(dispatcher) {
38-
barrier.await()
39-
runBlocking {
40-
if ((yieldMask and 1) != 0) yield()
41-
barrier2.await()
42-
if ((yieldMask and 2) != 0) yield()
43-
}
44-
}
45-
oldRunBlockings.addLast(blocking)
46-
val task = async(dispatcher) {
47-
if ((yieldMask and 4) != 0) yield()
48-
42.also {
49-
if ((yieldMask and 8) != 0) yield()
50-
}
51-
}
52-
barrier.await()
53-
task.join()
54-
barrier2.complete(Unit)
55-
56-
oldRunBlockings.removeIf(Job::isCompleted)
57-
while (oldRunBlockings.size > 5) {
58-
busyWaits++
59-
oldRunBlockings.removeIf(Job::isCompleted)
60-
}
61-
if (oldRunBlockings.size > maxOldRunBlockings) {
62-
maxOldRunBlockings = oldRunBlockings.size
63-
}
64-
}
65-
}
23+
fun testLivenessOfDefaultDispatcher(): Unit = testSchedulerLiveness(dispatcher, yieldMask)
6624
}

0 commit comments

Comments
 (0)