Skip to content

Commit 6caeefa

Browse files
committed
Merge pull request #61 from Ichoran/step-stream
Compatibility for Java 8 Streams through Steppers, a Spliterator/Iterator hybrid
2 parents 70fa4da + d3606b7 commit 6caeefa

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+11051
-3
lines changed

README.md

Lines changed: 131 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,134 @@ class Test {
117117
```
118118

119119

120-
## Future work
121-
- Converters for `java.util.stream`
122-
- [`Spliterator`](https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html)s for Scala collections
120+
## Converters from Scala collections to Java 8 Streams
121+
122+
Scala collections gain `seqStream` and `parStream` as extension methods that produce a Java 8 Stream
123+
running sequentially or in parallel, respectively. These are automatically specialized to a primitive
124+
type if possible. For instance, `List(1,2).seqStream` produces an `IntStream`. Maps additionally have
125+
`seqKeyStream`, `seqValueStream`, `parKeyStream`, and `parValueStream` methods.
126+
127+
Scala collections also gain `accumulate` and `stepper` methods that produce utility collections that
128+
can be useful when working with Java 8 Streams. `accumulate` produces an `Accumulator` or its primitive
129+
counterpart (`DoubleAccumulator`, etc.), which is a low-level collection designed for efficient collection
130+
and dispatching of results to and from Streams. Unlike most collections, it can contain more than
131+
`Int.MaxValue` elements. `stepper` produces a `Stepper` which is a fusion of `Spliterator` and `Iterator`.
132+
`Stepper`s underlie the Scala collections' instances of Java 8 Streams.
133+
134+
Java 8 Streams gain `toScala[Coll]` and `accumulate` methods, to make it easy to produce Scala collections
135+
or Accumulators, respectively, from Java 8 Streams. For instance, `myStream.to[Vector]` will collect the
136+
contents of a Stream into a `scala.collection.immutable.Vector`. Note that standard sequential builders
137+
are used for collections, so this is best done to gather the results of an expensive computation.
138+
139+
Finally, there is a Java class, `ScalaStreamer`, that has a series of `from` methods that can be used to
140+
obtain Java 8 Streams from Scala collections from within Java.
141+
142+
#### Performance Considerations
143+
144+
For sequential operations, Scala's `iterator` almost always equals or exceeds the performance of a Java 8 stream. Thus,
145+
one should favor `iterator` (and its richer set of operations) over `seqStream` for general use. However, long
146+
chains of processing of primitive types can sometimes benefit from the manually specialized methods in `DoubleStream`,
147+
`IntStream`, and `LongStream`.
148+
149+
Note that although `iterator` typically has superior performance in a sequential context, the advantage is modest
150+
(usually less than 50% higher throughput for `iterator`).
151+
152+
For parallel operations, `parStream` and even `seqStream.parallel` meets or exceeds the performance of Scala parallel
153+
collections methods (invoked with `.par`). Especially for small collections, the difference can be substantial. In
154+
some cases, when a Scala (parallel) collection is the ultimate result, Scala parallel collections can have an advantage
155+
as the collection can (in some cases) be built in parallel.
156+
157+
Because the wrappers are invoked based on the static type of the collection, there are also cases where parallelization
158+
is inefficient when interfacing with Java 8 Streams (e.g. when a collection is typed as `Seq[String]` so might have linear
159+
access like `List`, but actually is a `WrappedArray[String]` that can be efficiently parallelized) but can be efficient
160+
with Scala parallel collections. The `parStream` method is only available when the static type is known to be compatible
161+
with rapid parallel operation; `seqStream` can be parallelized by using `.parallel`, but may or may not be efficient.
162+
163+
If the operations available on Java 8 Streams are sufficient, the collection type is known statically with enough precision
164+
to enable parStream, and an `Accumulator` or non-collection type is an acceptable result, Java 8 Streams will essentially
165+
always outperform the Scala parallel collections.
166+
167+
#### Scala Usage Example
168+
169+
```scala
170+
import scala.compat.java8.StreamConverters._
171+
172+
object Test {
173+
val m = collection.immutable.HashMap("fish" -> 2, "bird" -> 4)
174+
val s = m.parValueStream.sum // 6, potientially computed in parallel
175+
val t = m.seqKeyStream.toScala[List] // List("fish", "bird")
176+
val a = m.accumulate // Accumulator[(String, Int)]
177+
178+
val n = a.stepper.fold(0)(_ + _._1.length) +
179+
a.parStream.count // 8 + 2 = 10
180+
181+
val b = java.util.Arrays.stream(Array(2L, 3L, 4L)).
182+
accumulate // LongAccumulator
183+
val l = b.to[List] // List(2L, 3L, 4L)
184+
}
185+
```
186+
187+
#### Using Java 8 Streams with Scala Function Converters
188+
189+
Scala can emit Java SAMs for lambda expressions that are arguments to methods that take a Java SAM rather than
190+
a Scala Function. However, it can be convenient to restrict the SAM interface to interactions with Java code
191+
(including Java 8 Streams) rather than having it propagate throughout Scala code.
192+
193+
Using Java 8 Stream converters together with function converters allows one to accomplish this with only a modest
194+
amount of fuss.
195+
196+
Example:
197+
198+
```scala
199+
import scala.compat.java8.FunctionConverters._
200+
import scala.compat.java8.StreamConverters._
201+
202+
def mapToSortedString[A](xs: Vector[A], f: A => String, sep: String) =
203+
xs.parStream. // Creates java.util.stream.Stream[String]
204+
map[String](f.asJava).sorted. // Maps A to String and sorts (in parallel)
205+
toArray.mkString(sep) // Back to an Array to use Scala's mkString
206+
```
207+
208+
Note that explicit creation of a new lambda will tend to lead to improved type inference and at least equal
209+
performance:
210+
211+
```scala
212+
def mapToSortedString[A](xs: Vector[A], f: A => String, sep: String) =
213+
xs.parStream.
214+
map[String](a => f(a)).sorted. // Explicit lambda creates a SAM wrapper for f
215+
toArray.mkString(sep)
216+
```
217+
218+
#### Java Usage Example
219+
220+
To convert a Scala collection to a Java 8 Stream from within Java, it usually
221+
suffices to call `ScalaStreaming.from(xs)` on your collection `xs`. If `xs` is
222+
a map, you may wish to get the keys or values alone by using `fromKeys` or
223+
`fromValues`. If the collection has an underlying representation that is not
224+
efficiently parallelized (e.g. `scala.collection.immutable.List`), then
225+
`fromAccumulated` (and `fromAccumulatedKeys` and `fromAccumulatedValues`) will
226+
first gather the collection into an `Accumulator` and then return a stream over
227+
that accumulator. If not running in parallel, `from` is preferable (faster and
228+
less memory usage).
229+
230+
Note that a Scala `Iterator` cannot fulfill the contract of a Java 8 Stream
231+
(because it cannot support `trySplit` if it is called). Presently, one must
232+
call `fromAccumulated` on the `Iterator` to cache it, even if the Stream will
233+
be evaluated sequentially, or wrap it as a Java Iterator and use static
234+
methods in `Spliterator` to wrap that as a `Spliterator` and then a `Stream`.
235+
236+
Here is an example of conversion of a Scala collection within Java 8:
237+
238+
```java
239+
import scala.collection.mutable.ArrayBuffer;
240+
import scala.compat.java8.ScalaStreaming;
241+
242+
public class StreamConvertersExample {
243+
public int MakeAndUseArrayBuffer() {
244+
ArrayBuffer<String> ab = new ArrayBuffer<String>();
245+
ab.$plus$eq("salmon");
246+
ab.$plus$eq("herring");
247+
return ScalaStreaming.from(ab).mapToInt(x -> x.length()).sum(); // 6+7 = 13
248+
}
249+
}
250+
```

benchmark/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Benchmark suite for Java 8 Streams compatibility layer
2+
3+
This project is intended to support semi-manual benchmarking of the Java 8 streams compatibility layer in Scala collections.
4+
5+
Because the benchmarking is **very computationally expensive** it should be done occasionally, not automatically.
6+
7+
## Code generation step
8+
9+
1. Run `sbt console`
10+
11+
2. If the `JmhBench.scala` file already exists, delete it.
12+
13+
3. Enter `bench.codegen.Generate.jmhBench()` to generate the `JmhBench.scala` file.
14+
15+
## Benchmarking step
16+
17+
1. Make sure your terminal has plenty of lines of scrollback. (A couple thousand should do.)
18+
19+
2. Run `sbt`
20+
21+
3. Enter `jmh:run -i 5 -wi 3 -f5`. Wait overnight.
22+
23+
4. Clip off the last set of lines from the terminal window starting before the line that contains `[info] # Run complete. Total time:` and including that line until the end.
24+
25+
5. Save that in the file `results/jmhbench.log`
26+
27+
## Comparison step
28+
29+
1. Run `sbt console`
30+
31+
2. Enter `bench.examine.SpeedReports()`
32+
33+
3. Look at the ASCII art results showing speed comparisons.

benchmark/build.sbt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
enablePlugins(JmhPlugin)
2+
3+
lazy val root = (project in file(".")).settings(
4+
name := "java8-compat-bench",
5+
scalaVersion := "2.11.7",
6+
crossScalaVersions := List("2.11.7" /* TODO, "2.12.0-M3"*/),
7+
organization := "org.scala-lang.modules",
8+
version := "0.6.0-SNAPSHOT",
9+
unmanagedJars in Compile ++= Seq(baseDirectory.value / "../target/scala-2.11/scala-java8-compat_2.11-0.6.0-SNAPSHOT.jar")
10+
)

benchmark/project/plugins.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.5")

0 commit comments

Comments
 (0)