Skip to content

Commit 8133c97

Browse files
authored
Fix limitedParallelism implementation on K/N (#3226)
The initial implementation predates new memory model and was never working on it Fixes #3223
1 parent 6c326e4 commit 8133c97

File tree

4 files changed

+99
-28
lines changed

4 files changed

+99
-28
lines changed

kotlinx-coroutines-core/common/src/internal/LimitedDispatcher.kt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ internal class LimitedDispatcher(
2323

2424
private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
2525

26+
// A separate object that we can synchronize on for K/N
27+
private val workerAllocationLock = SynchronizedObject()
28+
2629
@ExperimentalCoroutinesApi
2730
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
2831
parallelism.checkParallelism()
@@ -50,8 +53,7 @@ internal class LimitedDispatcher(
5053
continue
5154
}
5255

53-
@Suppress("CAST_NEVER_SUCCEEDS")
54-
synchronized(this as SynchronizedObject) {
56+
synchronized(workerAllocationLock) {
5557
--runningWorkers
5658
if (queue.size == 0) return
5759
++runningWorkers
@@ -87,8 +89,7 @@ internal class LimitedDispatcher(
8789
}
8890

8991
private fun tryAllocateWorker(): Boolean {
90-
@Suppress("CAST_NEVER_SUCCEEDS")
91-
synchronized(this as SynchronizedObject) {
92+
synchronized(workerAllocationLock) {
9293
if (runningWorkers >= parallelism) return false
9394
++runningWorkers
9495
return true
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
package kotlinx.coroutines
6+
7+
import kotlin.test.*
8+
9+
class LimitedParallelismSharedTest : TestBase() {
10+
11+
@Test
12+
fun testLimitedDefault() = runTest {
13+
// Test that evaluates the very basic completion of tasks in limited dispatcher
14+
// for all supported platforms.
15+
// For more specific and concurrent tests, see 'concurrent' package.
16+
val view = Dispatchers.Default.limitedParallelism(1)
17+
val view2 = Dispatchers.Default.limitedParallelism(1)
18+
val j1 = launch(view) {
19+
while (true) {
20+
yield()
21+
}
22+
}
23+
val j2 = launch(view2) { j1.cancel() }
24+
joinAll(j1, j2)
25+
}
26+
27+
@Test
28+
fun testParallelismSpec() {
29+
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(0) }
30+
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(-1) }
31+
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) }
32+
Dispatchers.Default.limitedParallelism(Int.MAX_VALUE)
33+
}
34+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2016-2022 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3+
*/
4+
5+
import kotlinx.atomicfu.*
6+
import kotlinx.coroutines.*
7+
import kotlinx.coroutines.exceptions.*
8+
import kotlin.test.*
9+
10+
class LimitedParallelismConcurrentTest : TestBase() {
11+
12+
private val targetParallelism = 4
13+
private val iterations = 100_000
14+
private val parallelism = atomic(0)
15+
16+
private fun checkParallelism() {
17+
val value = parallelism.incrementAndGet()
18+
randomWait()
19+
assertTrue { value <= targetParallelism }
20+
parallelism.decrementAndGet()
21+
}
22+
23+
@Test
24+
fun testLimitedExecutor() = runMtTest {
25+
val executor = newFixedThreadPoolContext(targetParallelism, "test")
26+
val view = executor.limitedParallelism(targetParallelism)
27+
doStress {
28+
repeat(iterations) {
29+
launch(view) {
30+
checkParallelism()
31+
}
32+
}
33+
}
34+
executor.close()
35+
}
36+
37+
private suspend inline fun doStress(crossinline block: suspend CoroutineScope.() -> Unit) {
38+
repeat(stressTestMultiplier) {
39+
coroutineScope {
40+
block()
41+
}
42+
}
43+
}
44+
45+
@Test
46+
fun testTaskFairness() = runMtTest {
47+
val executor = newSingleThreadContext("test")
48+
val view = executor.limitedParallelism(1)
49+
val view2 = executor.limitedParallelism(1)
50+
val j1 = launch(view) {
51+
while (true) {
52+
yield()
53+
}
54+
}
55+
val j2 = launch(view2) { j1.cancel() }
56+
joinAll(j1, j2)
57+
executor.close()
58+
}
59+
}

kotlinx-coroutines-core/jvm/test/LimitedParallelismTest.kt renamed to kotlinx-coroutines-core/jvm/test/LimitedParallelismUnhandledExceptionTest.kt

Lines changed: 1 addition & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,7 @@ import java.util.concurrent.*
99
import kotlin.coroutines.*
1010
import kotlin.test.*
1111

12-
class LimitedParallelismTest : TestBase() {
13-
14-
@Test
15-
fun testParallelismSpec() {
16-
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(0) }
17-
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(-1) }
18-
assertFailsWith<IllegalArgumentException> { Dispatchers.Default.limitedParallelism(Int.MIN_VALUE) }
19-
Dispatchers.Default.limitedParallelism(Int.MAX_VALUE)
20-
}
21-
22-
@Test
23-
fun testTaskFairness() = runTest {
24-
val executor = newSingleThreadContext("test")
25-
val view = executor.limitedParallelism(1)
26-
val view2 = executor.limitedParallelism(1)
27-
val j1 = launch(view) {
28-
while (true) {
29-
yield()
30-
}
31-
}
32-
val j2 = launch(view2) { j1.cancel() }
33-
joinAll(j1, j2)
34-
executor.close()
35-
}
12+
class LimitedParallelismUnhandledExceptionTest : TestBase() {
3613

3714
@Test
3815
fun testUnhandledException() = runTest {

0 commit comments

Comments
 (0)