Skip to content

Commit 4f68777

Browse files
committed
[utbot-rd]
deadlock fix
1 parent 6573b78 commit 4f68777

File tree

3 files changed

+59
-63
lines changed

3 files changed

+59
-63
lines changed

utbot-framework/src/main/kotlin/org/utbot/framework/process/EngineMain.kt

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.utbot.framework.process
22

33
import com.jetbrains.rd.util.Logger
4+
import com.jetbrains.rd.util.info
45
import com.jetbrains.rd.util.lifetime.Lifetime
56
import com.jetbrains.rd.util.lifetime.LifetimeDefinition
67
import kotlinx.coroutines.runBlocking
@@ -36,46 +37,44 @@ import kotlin.time.Duration.Companion.seconds
3637
private val messageFromMainTimeoutMillis = 120.seconds
3738
private val logger = KotlinLogging.logger {}
3839

39-
// use log4j2.configurationFile property to set log4j configuration
40-
suspend fun main(args: Array<String>) = runBlocking {
41-
// 0 - auto port for server, should not be used here
42-
val port = findRdPort(args)
43-
val ldef = LifetimeDefinition()
44-
val kryoHelper = KryoHelper(ldef)
45-
46-
kryoHelper.addInstantiator(soot.util.HashChain::class.java) {
40+
private fun KryoHelper.setup(): KryoHelper = this.apply {
41+
addInstantiator(soot.util.HashChain::class.java) {
4742
HashChain<Any>()
4843
}
49-
kryoHelper.addInstantiator(soot.UnitPatchingChain::class.java) {
44+
addInstantiator(soot.UnitPatchingChain::class.java) {
5045
UnitPatchingChain(HashChain())
5146
}
52-
kryoHelper.addInstantiator(Collections.synchronizedCollection(mutableListOf<SootMethod>()).javaClass) {
47+
addInstantiator(Collections.synchronizedCollection(mutableListOf<SootMethod>()).javaClass) {
5348
Collections.synchronizedCollection(mutableListOf<SootMethod>())
5449
}
55-
kryoHelper.addInstantiator(Collections.synchronizedCollection(mutableListOf<Any>()).javaClass) {
50+
addInstantiator(Collections.synchronizedCollection(mutableListOf<Any>()).javaClass) {
5651
Collections.synchronizedCollection(mutableListOf<Any>())
5752
}
53+
}
54+
55+
// use log4j2.configurationFile property to set log4j configuration
56+
suspend fun main(args: Array<String>) = runBlocking {
57+
// 0 - auto port for server, should not be used here
58+
val port = findRdPort(args)
5859

5960
Logger.set(Lifetime.Eternal, UtRdKLoggerFactory(logger))
6061

61-
ClientProtocolBuilder().withProtocolTimeout(messageFromMainTimeoutMillis).start(ldef, port) {
62+
ClientProtocolBuilder().withProtocolTimeout(messageFromMainTimeoutMillis).start(port) {
6263
settingsModel
6364
AbstractSettings.setupFactory(RdSettingsContainerFactory(protocol))
64-
engineProcessModel.setup(ldef, kryoHelper, it) {
65-
ldef.terminate()
66-
}
67-
}
65+
val kryoHelper = KryoHelper(lifetime).setup()
6866

69-
ldef.awaitTermination()
67+
engineProcessModel.setup(kryoHelper, it)
68+
}
69+
logger.info { "runBlocking ending" }
70+
}.also {
71+
logger.info { "runBlocking ended" }
7072
}
71-
7273
private lateinit var testGenerator: TestCaseGenerator
7374

7475
private fun EngineProcessModel.setup(
75-
lifetime: Lifetime,
7676
kryoHelper: KryoHelper,
77-
synchronizer: CallsSynchronizer,
78-
onStop: () -> Unit
77+
synchronizer: CallsSynchronizer
7978
) {
8079
val model = this
8180
synchronizer.measureExecutionForTermination(setupUtContext) { params ->
@@ -91,7 +90,7 @@ private fun EngineProcessModel.setup(
9190
jdkInfo = JdkInfo(Paths.get(params.jdkInfo.path), params.jdkInfo.version),
9291
isCanceled = {
9392
runBlocking {
94-
model.isCancelled.startSuspending(lifetime, Unit)
93+
model.isCancelled.startSuspending(Unit)
9594
}
9695
})
9796
}
@@ -162,5 +161,5 @@ private fun EngineProcessModel.setup(
162161
)
163162
)
164163
}
165-
synchronizer.measureExecutionForTermination(stopProcess) { onStop() }
164+
synchronizer.measureExecutionForTermination(stopProcess) { synchronizer.stopProtocol() }
166165
}

utbot-instrumentation/src/main/kotlin/org/utbot/instrumentation/process/ChildProcess.kt

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -83,32 +83,27 @@ suspend fun main(args: Array<String>) = runBlocking {
8383

8484
Logger.set(Lifetime.Eternal, UtRdConsoleLoggerFactory(defaultLogLevel, System.err))
8585
val port = findRdPort(args)
86-
val ldef = LifetimeDefinition()
87-
val kryoHelper = KryoHelper(ldef)
88-
logger.info { "kryo created" }
89-
90-
91-
ClientProtocolBuilder().withProtocolTimeout(messageFromMainTimeout).start(ldef, port) {
92-
logger.info { "setup started" }
93-
childProcessModel.setup(kryoHelper, it) {
94-
ldef.terminate()
95-
}
96-
logger.info { "setup ended" }
97-
}
98-
logger.info { "client started" }
9986

10087
try {
101-
ldef.awaitTermination()
88+
ClientProtocolBuilder().withProtocolTimeout(messageFromMainTimeout).start(port) {
89+
val kryoHelper = KryoHelper(lifetime)
90+
logger.info { "setup started" }
91+
childProcessModel.setup(kryoHelper, it)
92+
logger.info { "setup ended" }
93+
}
10294
} catch (e: Throwable) {
10395
logger.error { "Terminating process because exception occurred: ${e.stackTraceToString()}" }
10496
}
97+
logger.info { "runBlocking ending" }
98+
}.also {
99+
logger.info { "runBlocking ended" }
105100
}
106101

107102
private lateinit var pathsToUserClasses: Set<String>
108103
private lateinit var pathsToDependencyClasses: Set<String>
109104
private lateinit var instrumentation: Instrumentation<*>
110105

111-
private fun ChildProcessModel.setup(kryoHelper: KryoHelper, synchronizer: CallsSynchronizer, onStop: () -> Unit) {
106+
private fun ChildProcessModel.setup(kryoHelper: KryoHelper, synchronizer: CallsSynchronizer) {
112107
synchronizer.measureExecutionForTermination(warmup) {
113108
logger.debug { "received warmup request" }
114109
val time = measureTimeMillis {
@@ -152,7 +147,7 @@ private fun ChildProcessModel.setup(kryoHelper: KryoHelper, synchronizer: CallsS
152147
UtContext.setUtContext(UtContext(HandlerClassesLoader))
153148
}
154149
synchronizer.measureExecutionForTermination(stopProcess) { logger.debug { "stop request" }
155-
onStop()
150+
synchronizer.stopProtocol()
156151
}
157152
synchronizer.measureExecutionForTermination(collectCoverage) { params ->
158153
logger.debug { "collect coverage request" }

utbot-rd/src/main/kotlin/org/utbot/rd/ClientProcessUtil.kt

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,19 @@ fun findRdPort(args: Array<String>): Int {
5555
?: throw IllegalArgumentException("No port provided")
5656
}
5757

58-
class CallsSynchronizer(val timeout: Duration) {
58+
class CallsSynchronizer(private val ldef: LifetimeDefinition, val timeout: Duration) {
5959
private enum class State {
6060
STARTED,
6161
ENDED
6262
}
63+
6364
private val synchronizer: Channel<State> = Channel(1)
6465

6566
fun <T> measureExecutionForTermination(block: () -> T): T = runBlocking {
6667
try {
6768
synchronizer.send(State.STARTED)
6869
return@runBlocking block()
69-
}
70-
finally {
70+
} finally {
7171
synchronizer.send(State.ENDED)
7272
}
7373
}
@@ -82,7 +82,7 @@ class CallsSynchronizer(val timeout: Duration) {
8282
}
8383
}
8484

85-
suspend fun setupTimeout(ldef: LifetimeDefinition) {
85+
suspend fun setupTimeout() {
8686
ldef.launch {
8787
var lastState = State.ENDED
8888
while (ldef.isAlive) {
@@ -93,8 +93,8 @@ class CallsSynchronizer(val timeout: Duration) {
9393
if (current == null) {
9494
if (lastState == State.ENDED) {
9595
// process is waiting for command more than expected, better die
96-
logger.info { "terminating lifetime" }
97-
ldef.terminate()
96+
logger.info { "terminating lifetime by timeout" }
97+
stopProtocol()
9898
break
9999
}
100100
} else {
@@ -103,28 +103,31 @@ class CallsSynchronizer(val timeout: Duration) {
103103
}
104104
}
105105
}
106+
107+
fun stopProtocol() {
108+
ldef.terminate()
109+
}
106110
}
107111

108112
class ClientProtocolBuilder {
109113
private var timeout = Duration.INFINITE
110114

111-
suspend fun start(parent: Lifetime, port: Int, bindables: Protocol.(CallsSynchronizer) -> Unit) {
115+
suspend fun start(port: Int, parent: Lifetime? = null, bindables: Protocol.(CallsSynchronizer) -> Unit) {
112116
val pid = currentProcessPid.toInt()
113-
val ldef = parent.createNested()
114-
115-
ldef += { logger.info { "lifetime terminated" } }
116-
ldef += {
117-
val syncFile = File(processSyncDirectory, childCreatedFileName(port))
118-
119-
if (syncFile.exists()) {
120-
logger.info { "sync file existed" }
121-
syncFile.delete()
117+
val ldef = parent?.createNested() ?: LifetimeDefinition()
118+
ldef.terminateOnException { _ ->
119+
ldef += { logger.info { "lifetime terminated" } }
120+
ldef += {
121+
val syncFile = File(processSyncDirectory, childCreatedFileName(port))
122+
123+
if (syncFile.exists()) {
124+
logger.info { "sync file existed" }
125+
syncFile.delete()
126+
}
122127
}
123-
}
124-
logger.info { "pid - $pid, port - $port" }
125-
logger.info { "isJvm8 - $isJvm8, isJvm9Plus - $isJvm9Plus, isWindows - $isWindows" }
128+
logger.info { "pid - $pid, port - $port" }
129+
logger.info { "isJvm8 - $isJvm8, isJvm9Plus - $isJvm9Plus, isWindows - $isWindows" }
126130

127-
try {
128131
val name = "Client$port"
129132
val rdClientProtocolScheduler = SingleThreadScheduler(ldef, "Scheduler for $name")
130133
val clientProtocol = Protocol(
@@ -135,9 +138,9 @@ class ClientProtocolBuilder {
135138
SocketWire.Client(ldef, rdClientProtocolScheduler, port),
136139
ldef
137140
)
138-
val synchronizer = CallsSynchronizer(timeout)
141+
val synchronizer = CallsSynchronizer(ldef, timeout)
139142

140-
synchronizer.setupTimeout(ldef)
143+
synchronizer.setupTimeout()
141144
rdClientProtocolScheduler.pump(ldef) {
142145
clientProtocol.synchronizationModel
143146
clientProtocol.bindables(synchronizer)
@@ -159,8 +162,7 @@ class ClientProtocolBuilder {
159162
}
160163
answerFromMainProcess.await()
161164
}
162-
} catch (e: Exception) {
163-
ldef.terminate()
165+
ldef.awaitTermination()
164166
}
165167
}
166168

0 commit comments

Comments
 (0)