Skip to content

Commit a0c8b82

Browse files
committed
Clone scala/sys/process/ProcessBuilderImpl.scala
1 parent c36bd1f commit a0c8b82

File tree

1 file changed

+274
-0
lines changed

1 file changed

+274
-0
lines changed
Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
/*
2+
* Scala (https://www.scala-lang.org)
3+
*
4+
* Copyright EPFL and Lightbend, Inc.
5+
*
6+
* Licensed under Apache License 2.0
7+
* (http://www.apache.org/licenses/LICENSE-2.0).
8+
*
9+
* See the NOTICE file distributed with this work for
10+
* additional information regarding copyright ownership.
11+
*/
12+
13+
package scala
14+
package sys
15+
package process
16+
17+
import processInternal._
18+
import Process._
19+
import BasicIO.{LazilyListed, Streamed, Uncloseable}
20+
import Uncloseable.protect
21+
22+
import java.io.{FileInputStream, FileOutputStream}
23+
import java.util.concurrent.LinkedBlockingQueue
24+
25+
import scala.util.control.NonFatal
26+
27+
private[process] trait ProcessBuilderImpl {
28+
self: ProcessBuilder.type =>
29+
30+
private[process] class DaemonBuilder(underlying: ProcessBuilder) extends AbstractBuilder {
31+
final def run(io: ProcessIO): Process = underlying.run(io.daemonized())
32+
}
33+
34+
private[process] class Dummy(override val toString: String, exitValue: => Int) extends AbstractBuilder {
35+
override def run(io: ProcessIO): Process = new DummyProcess(exitValue)
36+
override def canPipeTo = true
37+
}
38+
39+
private[process] class URLInput(url: URL) extends IStreamBuilder(url.openStream(), url.toString)
40+
private[process] class FileInput(file: File) extends IStreamBuilder(new FileInputStream(file), file.getAbsolutePath)
41+
private[process] class FileOutput(file: File, append: Boolean) extends OStreamBuilder(new FileOutputStream(file, append), file.getAbsolutePath)
42+
43+
private[process] class OStreamBuilder(
44+
stream: => OutputStream,
45+
label: String
46+
) extends ThreadBuilder(label, _ writeInput protect(stream)) {
47+
override def hasExitValue = false
48+
}
49+
50+
private[process] class IStreamBuilder(
51+
stream: => InputStream,
52+
label: String
53+
) extends ThreadBuilder(label, _ processOutput protect(stream)) {
54+
override def hasExitValue = false
55+
}
56+
57+
private[process] abstract class ThreadBuilder(
58+
override val toString: String,
59+
runImpl: ProcessIO => Unit
60+
) extends AbstractBuilder {
61+
62+
override def run(io: ProcessIO): Process = {
63+
val success = new LinkedBlockingQueue[Boolean](1)
64+
def go(): Unit = {
65+
var ok = false
66+
try {
67+
runImpl(io)
68+
ok = true
69+
} finally success.put(ok)
70+
}
71+
val t = Spawn("ThreadProcess", io.daemonizeThreads)(go())
72+
new ThreadProcess(t, success)
73+
}
74+
}
75+
76+
/** Represents a simple command without any redirection or combination. */
77+
private[process] class Simple(p: JProcessBuilder) extends AbstractBuilder {
78+
override def run(io: ProcessIO): Process = {
79+
import java.lang.ProcessBuilder.Redirect.{INHERIT => Inherit}
80+
import io.{daemonizeThreads, processError, processOutput, writeInput}
81+
82+
val inherit = writeInput eq BasicIO.connectToStdIn
83+
if (inherit) p.redirectInput(Inherit)
84+
85+
val process = p.start() // start the external process
86+
87+
// spawn threads that process the input, output, and error streams using the functions defined in `io`
88+
val inThread =
89+
if (inherit || (writeInput eq BasicIO.connectNoOp)) null
90+
else Spawn("Simple-input", daemon = true)(writeInput(process.getOutputStream))
91+
val outThread = Spawn("Simple-output", daemonizeThreads)(processOutput(process.getInputStream()))
92+
val errorThread =
93+
if (p.redirectErrorStream) Nil
94+
else List(Spawn("Simple-error", daemonizeThreads)(processError(process.getErrorStream())))
95+
96+
new SimpleProcess(process, inThread, outThread :: errorThread)
97+
}
98+
override def toString = p.command.toString
99+
override def canPipeTo = true
100+
}
101+
102+
private[scala] abstract class AbstractBuilder extends ProcessBuilder with Sink with Source {
103+
protected def toSource = this
104+
protected def toSink = this
105+
106+
private[this] val defaultStreamCapacity = 4096
107+
108+
def #|(other: ProcessBuilder): ProcessBuilder = {
109+
require(other.canPipeTo, "Piping to multiple processes is not supported.")
110+
new PipedBuilder(this, other, false)
111+
}
112+
def #||(other: ProcessBuilder): ProcessBuilder = new OrBuilder(this, other)
113+
def #&&(other: ProcessBuilder): ProcessBuilder = new AndBuilder(this, other)
114+
def ###(other: ProcessBuilder): ProcessBuilder = new SequenceBuilder(this, other)
115+
116+
def run(): Process = run(connectInput = false)
117+
def run(connectInput: Boolean): Process = run(BasicIO.standard(connectInput))
118+
def run(log: ProcessLogger): Process = run(log, connectInput = false)
119+
def run(log: ProcessLogger, connectInput: Boolean): Process = run(BasicIO(connectInput, log))
120+
121+
def !! = slurp(None, withIn = false)
122+
def !!(log: ProcessLogger) = slurp(Some(log), withIn = false)
123+
def !!< = slurp(None, withIn = true)
124+
def !!<(log: ProcessLogger) = slurp(Some(log), withIn = true)
125+
126+
def lazyLines: LazyList[String] = lazyLines(withInput = false, nonZeroException = true, None, defaultStreamCapacity)
127+
def lazyLines(log: ProcessLogger): LazyList[String] = lazyLines(withInput = false, nonZeroException = true, Some(log), defaultStreamCapacity)
128+
def lazyLines_! : LazyList[String] = lazyLines(withInput = false, nonZeroException = false, None, defaultStreamCapacity)
129+
def lazyLines_!(log: ProcessLogger): LazyList[String] = lazyLines(withInput = false, nonZeroException = false, Some(log), defaultStreamCapacity)
130+
def lazyLines(capacity: Integer): LazyList[String] = lazyLines(withInput = false, nonZeroException = true, None, capacity)
131+
def lazyLines(log: ProcessLogger, capacity: Integer): LazyList[String] = lazyLines(withInput = false, nonZeroException = true, Some(log), capacity)
132+
def lazyLines_!(capacity: Integer) : LazyList[String] = lazyLines(withInput = false, nonZeroException = false, None, capacity)
133+
def lazyLines_!(log: ProcessLogger, capacity: Integer): LazyList[String] = lazyLines(withInput = false, nonZeroException = false, Some(log), capacity)
134+
135+
@deprecated("internal", since = "2.13.4") def lineStream: Stream[String] = lineStream(withInput = false, nonZeroException = true, None, defaultStreamCapacity)
136+
@deprecated("internal", since = "2.13.4") def lineStream(log: ProcessLogger): Stream[String] = lineStream(withInput = false, nonZeroException = true, Some(log), defaultStreamCapacity)
137+
@deprecated("internal", since = "2.13.4") def lineStream_! : Stream[String] = lineStream(withInput = false, nonZeroException = false, None, defaultStreamCapacity)
138+
@deprecated("internal", since = "2.13.4") def lineStream_!(log: ProcessLogger): Stream[String] = lineStream(withInput = false, nonZeroException = false, Some(log), defaultStreamCapacity)
139+
@deprecated("internal", since = "2.13.4") def lineStream(capacity: Integer): Stream[String] = lineStream(withInput = false, nonZeroException = true, None, capacity)
140+
@deprecated("internal", since = "2.13.4") def lineStream(log: ProcessLogger, capacity: Integer): Stream[String] = lineStream(withInput = false, nonZeroException = true, Some(log), capacity)
141+
@deprecated("internal", since = "2.13.4") def lineStream_!(capacity: Integer) : Stream[String] = lineStream(withInput = false, nonZeroException = false, None, capacity)
142+
@deprecated("internal", since = "2.13.4") def lineStream_!(log: ProcessLogger, capacity: Integer): Stream[String] = lineStream(withInput = false, nonZeroException = false, Some(log), capacity)
143+
144+
def ! = run(connectInput = false).exitValue()
145+
def !(io: ProcessIO) = run(io).exitValue()
146+
def !(log: ProcessLogger) = runBuffered(log, connectInput = false)
147+
def !< = run(connectInput = true).exitValue()
148+
def !<(log: ProcessLogger) = runBuffered(log, connectInput = true)
149+
150+
/** Constructs a new builder which runs this command with all input/output threads marked
151+
* as daemon threads. This allows the creation of a long running process while still
152+
* allowing the JVM to exit normally.
153+
*
154+
* Note: not in the public API because it's not fully baked, but I need the capability
155+
* for fsc.
156+
*/
157+
def daemonized(): ProcessBuilder = new DaemonBuilder(this)
158+
159+
private[this] def slurp(log: Option[ProcessLogger], withIn: Boolean): String = {
160+
val buffer = new StringBuffer
161+
val code = this ! BasicIO(withIn, buffer, log)
162+
163+
if (code == 0) buffer.toString
164+
else scala.sys.error("Nonzero exit value: " + code)
165+
}
166+
167+
private[this] def lazyLines(
168+
withInput: Boolean,
169+
nonZeroException: Boolean,
170+
log: Option[ProcessLogger],
171+
capacity: Integer
172+
): LazyList[String] = {
173+
val lazilyListed = LazilyListed[String](nonZeroException, capacity)
174+
val process = run(BasicIO(withInput, lazilyListed.process, log))
175+
176+
// extract done from lazilyListed so that the anonymous function below closes over just the done and not the whole lazilyListed (see https://github.com/scala/bug/issues/12185)
177+
val done = lazilyListed.done
178+
179+
Spawn("LazyLines") {
180+
done {
181+
try process.exitValue()
182+
catch {
183+
case NonFatal(_) => -2
184+
}
185+
}
186+
}
187+
lazilyListed.lazyList
188+
}
189+
190+
@deprecated("internal", since = "2.13.4")
191+
private[this] def lineStream(
192+
withInput: Boolean,
193+
nonZeroException: Boolean,
194+
log: Option[ProcessLogger],
195+
capacity: Integer
196+
): Stream[String] = {
197+
val streamed = Streamed[String](nonZeroException, capacity)
198+
val process = run(BasicIO(withInput, streamed.process, log))
199+
200+
Spawn("LineStream")(streamed done process.exitValue())
201+
streamed.stream()
202+
}
203+
204+
private[this] def runBuffered(log: ProcessLogger, connectInput: Boolean) =
205+
log buffer run(log, connectInput).exitValue()
206+
207+
def canPipeTo = false
208+
def hasExitValue = true
209+
}
210+
211+
private[process] class URLImpl(url: URL) extends URLBuilder with Source {
212+
protected def toSource = new URLInput(url)
213+
}
214+
private[process] class FileImpl(base: File) extends FileBuilder with Sink with Source {
215+
protected def toSource = new FileInput(base)
216+
protected def toSink = new FileOutput(base, false)
217+
218+
def #<<(f: File): ProcessBuilder = #<<(new FileInput(f))
219+
def #<<(u: URL): ProcessBuilder = #<<(new URLInput(u))
220+
def #<<(s: => InputStream): ProcessBuilder = #<<(new IStreamBuilder(s, "<input stream>"))
221+
def #<<(b: ProcessBuilder): ProcessBuilder = new PipedBuilder(b, new FileOutput(base, true), false)
222+
}
223+
224+
private[process] abstract class BasicBuilder extends AbstractBuilder {
225+
protected[this] def checkNotThis(a: ProcessBuilder) = require(a != this, "Compound process '" + a + "' cannot contain itself.")
226+
final def run(io: ProcessIO): Process = {
227+
val p = createProcess(io)
228+
p.start()
229+
p
230+
}
231+
protected[this] def createProcess(io: ProcessIO): BasicProcess
232+
}
233+
234+
private[process] abstract class SequentialBuilder(
235+
a: ProcessBuilder,
236+
b: ProcessBuilder,
237+
operatorString: String
238+
) extends BasicBuilder {
239+
240+
checkNotThis(a)
241+
checkNotThis(b)
242+
override def toString = " ( " + a + " " + operatorString + " " + b + " ) "
243+
}
244+
245+
private[process] class PipedBuilder(
246+
first: ProcessBuilder,
247+
second: ProcessBuilder,
248+
toError: Boolean
249+
) extends SequentialBuilder(first, second, if (toError) "#|!" else "#|") {
250+
251+
override def createProcess(io: ProcessIO) = new PipedProcesses(first, second, io, toError)
252+
}
253+
254+
private[process] class AndBuilder(
255+
first: ProcessBuilder,
256+
second: ProcessBuilder
257+
) extends SequentialBuilder(first, second, "#&&") {
258+
override def createProcess(io: ProcessIO) = new AndProcess(first, second, io)
259+
}
260+
261+
private[process] class OrBuilder(
262+
first: ProcessBuilder,
263+
second: ProcessBuilder
264+
) extends SequentialBuilder(first, second, "#||") {
265+
override def createProcess(io: ProcessIO) = new OrProcess(first, second, io)
266+
}
267+
268+
private[process] class SequenceBuilder(
269+
first: ProcessBuilder,
270+
second: ProcessBuilder
271+
) extends SequentialBuilder(first, second, "###") {
272+
override def createProcess(io: ProcessIO) = new ProcessSequence(first, second, io)
273+
}
274+
}

0 commit comments

Comments
 (0)