Skip to content

Commit 6dfbc94

Browse files
committed
[instrumentation-rd]
removed count down latches
1 parent 5d77c28 commit 6dfbc94

File tree

7 files changed

+147
-140
lines changed

7 files changed

+147
-140
lines changed

utbot-instrumentation/src/main/kotlin/org/utbot/instrumentation/ConcreteExecutor.kt

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import org.utbot.instrumentation.rd.UtInstrumentationProcess
1919
import org.utbot.instrumentation.rd.UtRdLoggerFactory
2020
import org.utbot.instrumentation.rd.generated.InvokeMethodCommandParams
2121
import org.utbot.instrumentation.util.ChildProcessError
22-
import org.utbot.rd.terminateOnException
2322
import java.io.Closeable
2423
import java.util.concurrent.atomic.AtomicLong
2524
import kotlin.concurrent.thread
@@ -101,7 +100,7 @@ class ConcreteExecutor<TIResult, TInstrumentation : Instrumentation<TIResult>> p
101100
internal val pathsToUserClasses: String,
102101
internal val pathsToDependencyClasses: String
103102
) : Closeable, Executor<TIResult> {
104-
private val def: LifetimeDefinition = LifetimeDefinition()
103+
private val ldef: LifetimeDefinition = LifetimeDefinition()
105104
private val childProcessRunner: ChildProcessRunner = ChildProcessRunner()
106105

107106
companion object {
@@ -141,32 +140,30 @@ class ConcreteExecutor<TIResult, TInstrumentation : Instrumentation<TIResult>> p
141140

142141
//property that signals to executors pool whether it can reuse this executor or not
143142
val alive: Boolean
144-
get() = def.isAlive
143+
get() = ldef.isAlive
145144

146145
private val corMutex = Mutex()
147146
private var processInstance: UtInstrumentationProcess? = null
148147

149148
// this function is intended to be called under corMutex
150149
private suspend fun regenerate(): UtInstrumentationProcess {
150+
ldef.throwIfNotAlive()
151151

152-
def.throwIfNotAlive()
153-
var proc : UtInstrumentationProcess? = processInstance
152+
var proc: UtInstrumentationProcess? = processInstance
154153

155154
if (proc == null || !proc.lifetime.isAlive) {
156-
def.createNested().terminateOnException {
157-
proc = UtInstrumentationProcess(
158-
it,
159-
childProcessRunner,
160-
instrumentation,
161-
pathsToUserClasses,
162-
pathsToDependencyClasses,
163-
classLoader
164-
)
165-
processInstance = proc
166-
}
155+
proc = UtInstrumentationProcess(
156+
ldef,
157+
childProcessRunner,
158+
instrumentation,
159+
pathsToUserClasses,
160+
pathsToDependencyClasses,
161+
classLoader
162+
)
163+
processInstance = proc
167164
}
168165

169-
return proc!!
166+
return proc
170167
}
171168

172169
/**
@@ -271,7 +268,7 @@ class ConcreteExecutor<TIResult, TInstrumentation : Instrumentation<TIResult>> p
271268
} catch (_: Exception) {}
272269
processInstance = null
273270
}
274-
def.terminate()
271+
ldef.terminate()
275272
}
276273
}
277274
}

utbot-instrumentation/src/main/kotlin/org/utbot/instrumentation/process/ChildProcess.kt

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,14 @@ import org.utbot.instrumentation.rd.processSyncDirectory
2424
import org.utbot.instrumentation.rd.signalChildReady
2525
import org.utbot.instrumentation.util.KryoHelper
2626
import org.utbot.rd.UtSingleThreadScheduler
27+
import org.utbot.rd.adviseForConditionAsync
2728
import java.io.File
2829
import java.io.OutputStream
2930
import java.io.PrintStream
3031
import java.net.URLClassLoader
3132
import java.security.AllPermission
3233
import java.time.LocalDateTime
3334
import java.time.format.DateTimeFormatter
34-
import java.util.concurrent.CountDownLatch
35-
import java.util.concurrent.TimeUnit
3635
import java.util.concurrent.atomic.AtomicLong
3736
import kotlin.system.measureTimeMillis
3837

@@ -264,20 +263,20 @@ private suspend fun initiate(lifetime: Lifetime, port: Int, pid: Int) {
264263
signalChildReady(pid)
265264
logInfo { "IO obtained" }
266265

267-
val latch = CountDownLatch(1)
268-
sync.advise(lifetime) {
266+
val answerFromMainProcess = sync.adviseForConditionAsync(lifetime) {
269267
if (it == "main") {
270268
sync.fire("child")
271-
latch.countDown()
269+
true
270+
} else {
271+
false
272272
}
273273
}
274274

275-
if (latch.await(messageFromMainTimeoutMillis.toLong(), TimeUnit.MILLISECONDS)) {
275+
try {
276+
answerFromMainProcess.await()
276277
logInfo { "starting instrumenting" }
277-
try {
278-
deferred.await()
279-
} catch (e: Throwable) {
280-
logError { "Terminating process because exception occurred: ${e.stackTraceToString()}" }
281-
}
278+
deferred.await()
279+
} catch (e: Throwable) {
280+
logError { "Terminating process because exception occurred: ${e.stackTraceToString()}" }
282281
}
283282
}

utbot-instrumentation/src/main/kotlin/org/utbot/instrumentation/rd/InstrumentationIO.kt

Lines changed: 8 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,20 @@ import com.jetbrains.rd.util.lifetime.Lifetime
77
import org.utbot.common.utBotTempDirectory
88
import org.utbot.instrumentation.rd.generated.ProtocolModel
99
import org.utbot.instrumentation.rd.generated.protocolModel
10+
import org.utbot.rd.pump
1011
import java.io.File
11-
import java.util.concurrent.CountDownLatch
12-
import java.util.concurrent.TimeUnit
1312

1413
const val rdProcessDirName = "rdProcessSync"
1514
val processSyncDirectory = File(utBotTempDirectory.toFile(), rdProcessDirName)
16-
private val awaitTimeoutMillis: Long = 120 * 1000
1715

18-
internal fun obtainClientIO(lifetime: Lifetime, protocol: Protocol): Pair<RdSignal<String>, ProtocolModel> {
19-
val latch = CountDownLatch(2)
20-
val sync = RdSignal<String>().static(1).init(lifetime, protocol, latch)
21-
22-
protocol.scheduler.invokeOrQueue {
23-
protocol.protocolModel
24-
latch.countDown()
16+
internal suspend fun obtainClientIO(lifetime: Lifetime, protocol: Protocol): Pair<RdSignal<String>, ProtocolModel> {
17+
return protocol.scheduler.pump(lifetime) {
18+
val sync = RdSignal<String>().static(1).apply {
19+
async = true
20+
bind(lifetime, protocol, rdid.toString())
21+
}
22+
sync to protocol.protocolModel
2523
}
26-
27-
if (!latch.await(awaitTimeoutMillis, TimeUnit.MILLISECONDS))
28-
throw IllegalStateException("Cannot bind signals")
29-
30-
return sync to protocol.protocolModel
3124
}
3225

3326
internal fun childCreatedFileName(pid: Int): String {
@@ -48,14 +41,4 @@ internal fun signalChildReady(pid: Int) {
4841
if (!created) {
4942
throw IllegalStateException("cannot create signal file")
5043
}
51-
}
52-
53-
private fun <T> RdSignal<T>.init(lifetime: Lifetime, protocol: Protocol, latch: CountDownLatch): RdSignal<T> {
54-
return this.apply {
55-
async = true
56-
protocol.scheduler.invokeOrQueue {
57-
this.bind(lifetime, protocol, rdid.toString())
58-
latch.countDown()
59-
}
60-
}
6144
}

utbot-instrumentation/src/main/kotlin/org/utbot/instrumentation/rd/UtInstrumentationProcess.kt

Lines changed: 30 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import com.jetbrains.rd.framework.base.static
44
import com.jetbrains.rd.framework.impl.RdSignal
55
import com.jetbrains.rd.util.lifetime.Lifetime
66
import com.jetbrains.rd.util.lifetime.isAlive
7-
import kotlinx.coroutines.CompletableDeferred
87
import kotlinx.coroutines.delay
98
import mu.KotlinLogging
109
import org.utbot.instrumentation.instrumentation.Instrumentation
@@ -14,8 +13,7 @@ import org.utbot.instrumentation.rd.generated.ProtocolModel
1413
import org.utbot.instrumentation.rd.generated.SetInstrumentationParams
1514
import org.utbot.instrumentation.rd.generated.protocolModel
1615
import org.utbot.instrumentation.util.KryoHelper
17-
import org.utbot.rd.ProcessWithRdServer
18-
import org.utbot.rd.startUtProcessWithRdServer
16+
import org.utbot.rd.*
1917
import java.io.File
2018
import java.nio.file.Files
2119
import java.util.concurrent.atomic.AtomicBoolean
@@ -40,20 +38,13 @@ class UtInstrumentationProcess private constructor(
4038
get() = protocol.protocolModel
4139

4240
private suspend fun init(): UtInstrumentationProcess {
43-
lifetime.usingNested { operation ->
44-
val bound = CompletableDeferred<Boolean>()
45-
46-
protocol.scheduler.invokeOrQueue {
47-
sync.bind(lifetime, protocol, sync.rdid.toString())
48-
protocol.protocolModel
49-
bound.complete(true)
50-
}
51-
operation.onTermination { bound.cancel() }
52-
bound.await()
41+
protocol.scheduler.pump(lifetime) {
42+
sync.bind(lifetime, protocol, sync.rdid.toString())
43+
protocol.protocolModel
5344
}
5445
processSyncDirectory.mkdirs()
55-
val pid = process.toHandle().pid().toInt()
5646

47+
val pid = process.toHandle().pid().toInt()
5748
val syncFile = File(processSyncDirectory, childCreatedFileName(pid))
5849

5950
while (lifetime.isAlive) {
@@ -65,18 +56,11 @@ class UtInstrumentationProcess private constructor(
6556
delay(fileWaitTimeoutMillis)
6657
}
6758

68-
lifetime.usingNested { syncLifetime ->
69-
val childReady = AtomicBoolean(false)
70-
sync.advise(syncLifetime) {
71-
if (it == "child") {
72-
childReady.set(true)
73-
}
74-
}
59+
val messageFromChild = sync.adviseForConditionAsync(lifetime) { it == "child" }
7560

76-
while (!childReady.get()) {
77-
sync.fire("main")
78-
delay(10)
79-
}
61+
while(messageFromChild.isActive) {
62+
sync.fire("main")
63+
delay(10)
8064
}
8165

8266
lifetime.onTermination {
@@ -90,16 +74,16 @@ class UtInstrumentationProcess private constructor(
9074
}
9175

9276
companion object {
93-
suspend operator fun <TIResult, TInstrumentation : Instrumentation<TIResult>> invoke(
94-
parent: Lifetime,
77+
private suspend fun <TIResult, TInstrumentation : Instrumentation<TIResult>> invokeImpl(
78+
lifetime: Lifetime,
9579
childProcessRunner: ChildProcessRunner,
9680
instrumentation: TInstrumentation,
9781
pathsToUserClasses: String,
9882
pathsToDependencyClasses: String,
9983
classLoader: ClassLoader?
10084
): UtInstrumentationProcess {
10185
val rdProcess: ProcessWithRdServer = startUtProcessWithRdServer(
102-
parent = parent
86+
lifetime = lifetime
10387
) {
10488
childProcessRunner.start(it)
10589
}
@@ -131,5 +115,23 @@ class UtInstrumentationProcess private constructor(
131115

132116
return proc
133117
}
118+
119+
suspend operator fun <TIResult, TInstrumentation : Instrumentation<TIResult>> invoke(
120+
lifetime: Lifetime,
121+
childProcessRunner: ChildProcessRunner,
122+
instrumentation: TInstrumentation,
123+
pathsToUserClasses: String,
124+
pathsToDependencyClasses: String,
125+
classLoader: ClassLoader?
126+
): UtInstrumentationProcess = lifetime.createNested().terminateOnException {
127+
invokeImpl(
128+
it,
129+
childProcessRunner,
130+
instrumentation,
131+
pathsToUserClasses,
132+
pathsToDependencyClasses,
133+
classLoader
134+
)
135+
}
134136
}
135137
}

utbot-rd/src/main/kotlin/org/utbot/rd/LifetimedProcess.kt

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,36 +8,35 @@ import java.util.concurrent.TimeUnit
88

99
/**
1010
* Creates LifetimedProcess.
11-
* If provided parent lifetime already terminated - throws CancellationException and process is not started.
11+
* If provided lifetime already terminated - throws CancellationException and process is not started.
1212
*/
13-
fun startLifetimedProcess(cmd: List<String>, parent: Lifetime? = null): LifetimedProcess {
14-
parent?.throwIfNotAlive()
13+
fun startLifetimedProcess(cmd: List<String>, lifetime: Lifetime? = null): LifetimedProcess {
14+
lifetime?.throwIfNotAlive()
1515

16-
return ProcessBuilder(cmd).start().toLifetimedProcess(parent)
16+
return ProcessBuilder(cmd).start().toLifetimedProcess(lifetime)
1717
}
1818

1919
/**
2020
* Creates LifetimedProcess from already running process.
2121
*
22-
* Process will be terminated if parent lifetime is terminated.
22+
* Process will be terminated if provided lifetime is terminated.
2323
*/
24-
fun Process.toLifetimedProcess(parent: Lifetime? = null): LifetimedProcess {
25-
return LifetimedProcessIml(this, parent)
24+
fun Process.toLifetimedProcess(lifetime: Lifetime? = null): LifetimedProcess {
25+
return LifetimedProcessIml(this, lifetime)
2626
}
2727

2828
/**
2929
* Main class goals
30-
* 1. lifetime terminates when process does
31-
* 2. process terminates when lifetime does
32-
* 3. optionally binding lifetime to parent scope
30+
* 1. if process terminates - lifetime terminates
31+
* 2. if lifetime terminates - process terminates
3332
*/
3433
interface LifetimedProcess {
3534
val lifetime: Lifetime
3635
val process: Process
3736
fun terminate()
3837
}
3938

40-
inline fun <T> LifetimedProcess.use(block: (LifetimedProcess) -> T): T {
39+
inline fun <T, R: LifetimedProcess> R.use(block: (R) -> T): T {
4140
try {
4241
return block(this)
4342
}
@@ -46,33 +45,43 @@ inline fun <T> LifetimedProcess.use(block: (LifetimedProcess) -> T): T {
4645
}
4746
}
4847

48+
inline fun <T, R: LifetimedProcess> R.terminateOnException(block: (R) -> T): T {
49+
try {
50+
return block(this)
51+
}
52+
catch(e: Throwable) {
53+
terminate()
54+
throw e
55+
}
56+
}
57+
4958
const val processKillTimeoutMillis = 100L
5059
const val checkProcessAliveDelay = 100L
5160

52-
class LifetimedProcessIml(override val process: Process, parent: Lifetime? = null): LifetimedProcess {
53-
private val def: LifetimeDefinition
61+
class LifetimedProcessIml(override val process: Process, lifetime: Lifetime? = null): LifetimedProcess {
62+
private val ldef: LifetimeDefinition
5463

5564
override val lifetime
56-
get() = def.lifetime
65+
get() = ldef.lifetime
5766

5867
init {
59-
def = parent?.createNested() ?: LifetimeDefinition()
60-
def.onTermination {
68+
ldef = lifetime?.createNested() ?: LifetimeDefinition()
69+
ldef.onTermination {
6170
process.destroy()
6271

6372
if (process.waitFor(processKillTimeoutMillis, TimeUnit.MILLISECONDS))
6473
process.destroyForcibly()
6574
}
66-
UtRdCoroutineScope.current.launch(def) {
75+
UtRdCoroutineScope.current.launch(ldef) {
6776
while (process.isAlive) {
6877
delay(checkProcessAliveDelay)
6978
}
7079

71-
def.terminate()
80+
ldef.terminate()
7281
}
7382
}
7483

7584
override fun terminate() {
76-
def.terminate()
85+
ldef.terminate()
7786
}
7887
}

0 commit comments

Comments
 (0)