-
Notifications
You must be signed in to change notification settings - Fork 102
Compatibility for Java 8 Streams through Steppers, a Spliterator/Iterator hybrid #61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
1562feb
f476c42
1cf9ea2
ec8ebd1
bc909f4
7b6e118
5f7d2d7
649dec4
e6c8ffa
2550aff
6ee09ef
37ea39c
04bc02a
e7033e3
6baf832
302fd15
ecf85a3
e562451
390bbab
a482301
4c72bd9
40c0117
4013be6
e4c18a3
27e72aa
ffeb369
a221edc
f9d4e35
d5bb589
f13b0f3
fedc1eb
7413c0c
edf0644
3f02f54
ace87d4
72fb53d
675f2ed
66756d1
2674205
4c5f0a1
02d56d5
c96039d
043672b
86370b0
e77ca87
970278c
82ec9a5
021490b
733a035
75a625d
29b5ad1
00977b8
8ab0468
bea1d39
af5de57
d3606b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -117,6 +117,134 @@ class Test { | |
``` | ||
|
||
|
||
## Future work | ||
- Converters for `java.util.stream` | ||
- [`Spliterator`](https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html)s for Scala collections | ||
## Converters from Scala collections to Java 8 Streams | ||
|
||
Scala collections gain `seqStream` and `parStream` as extension methods that produce a Java 8 Stream | ||
running sequentially or in parallel, respectively. These are automatically specialized to a primitive | ||
type if possible. For instance, `List(1,2).seqStream` produces an `IntStream`. Maps additionally have | ||
`seqKeyStream`, `seqValueStream`, `parKeyStream`, and `parValueStream` methods. | ||
|
||
Scala collections also gain `accumulate` and `stepper` methods that produce utility collections that | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When would you use these methods? It looks like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You would use this as a high-performance intermediate in stream processing when you need to build something and keep working on it. Internally, I also use it for collections that don't parallelize well. (I intended to only use them when parStream or the like was called, as an up-front sequential step; not sure how far I got in that.) Since you want to use it as an intermediate, you may as well expose it at the very beginning to make processing consistent. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incidentally, you could also just drop all the Accumulator stuff and use arrays instead, if you never needed streams longer than 2G. They're about the same speed. Everything else is slower (all Java or Scala collections I tested). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can see why you'd want to use it internally, I'm just concerned about exposing new concepts like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adriaan asked me to write a framework for exposing more collections internals so it was easier to do things like write parallelizing code. It seemed that having a Stepper already there would be a benefit. But I agree, it could probably be hidden as an implementation detail (at least with I'm not so sure it makes sense to hide Accumulator. If you want to safely collect intermediate results of unknown size, it is the thing to use. There are no equivalents. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair enough. I'm not sure inconvenience helps more than simplification, but I don't have a strong feeling either way. |
||
can be useful when working with Java 8 Streams. `accumulate` produces an `Accumulator` or its primitive | ||
counterpart (`DoubleAccumulator`, etc.), which is a low-level collection designed for efficient collection | ||
and dispatching of results to and from Streams. Unlike most collections, it can contain more than | ||
`Int.MaxValue` elements. `stepper` produces a `Stepper` which is a fusion of `Spliterator` and `Iterator`. | ||
`Stepper`s underlie the Scala collections' instances of Java 8 Streams. | ||
|
||
Java 8 Streams gain `toScala[Coll]` and `accumulate` methods, to make it easy to produce Scala collections | ||
or Accumulators, respectively, from Java 8 Streams. For instance, `myStream.to[Vector]` will collect the | ||
contents of a Stream into a `scala.collection.immutable.Vector`. Note that standard sequential builders | ||
are used for collections, so this is best done to gather the results of an expensive computation. | ||
|
||
Finally, there is a Java class, `ScalaStreamer`, that has a series of `from` methods that can be used to | ||
obtain Java 8 Streams from Scala collections from within Java. | ||
|
||
#### Performance Considerations | ||
|
||
For sequential operations, Scala's `iterator` almost always equals or exceeds the performance of a Java 8 stream. Thus, | ||
one should favor `iterator` (and its richer set of operations) over `seqStream` for general use. However, long | ||
chains of processing of primitive types can sometimes benefit from the manually specialized methods in `DoubleStream`, | ||
`IntStream`, and `LongStream`. | ||
|
||
Note that although `iterator` typically has superior performance in a sequential context, the advantage is modest | ||
(usually less than 50% higher throughput for `iterator`). | ||
|
||
For parallel operations, `parStream` and even `seqStream.parallel` meets or exceeds the performance of Scala parallel | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there use cases for having There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you are sure that you're going to support parallel operations, you might want to do different work up front. I don't remember if I actually implemented that, but note that the steppers do not see the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. You could override |
||
collections methods (invoked with `.par`). Especially for small collections, the difference can be substantial. In | ||
some cases, when a Scala (parallel) collection is the ultimate result, Scala parallel collections can have an advantage | ||
as the collection can (in some cases) be built in parallel. | ||
|
||
Because the wrappers are invoked based on the static type of the collection, there are also cases where parallelization | ||
is inefficient when interfacing with Java 8 Streams (e.g. when a collection is typed as `Seq[String]` so might have linear | ||
access like `List`, but actually is a `WrappedArray[String]` that can be efficiently parallelized) but can be efficient | ||
with Scala parallel collections. The `parStream` method is only available when the static type is known to be compatible | ||
with rapid parallel operation; `seqStream` can be parallelized by using `.parallel`, but may or may not be efficient. | ||
|
||
If the operations available on Java 8 Streams are sufficient, the collection type is known statically with enough precision | ||
to enable parStream, and an `Accumulator` or non-collection type is an acceptable result, Java 8 Streams will essentially | ||
always outperform the Scala parallel collections. | ||
|
||
#### Scala Usage Example | ||
|
||
```scala | ||
import scala.compat.java8.StreamConverters._ | ||
|
||
object Test { | ||
val m = collection.immutable.HashMap("fish" -> 2, "bird" -> 4) | ||
val s = m.parValueStream.sum // 6, potientially computed in parallel | ||
val t = m.seqKeyStream.toScala[List] // List("fish", "bird") | ||
val a = m.accumulate // Accumulator[(String, Int)] | ||
|
||
val n = a.stepper.fold(0)(_ + _._1.length) + | ||
a.parStream.count // 8 + 2 = 10 | ||
|
||
val b = java.util.Arrays.stream(Array(2L, 3L, 4L)). | ||
accumulate // LongAccumulator | ||
val l = b.to[List] // List(2L, 3L, 4L) | ||
} | ||
``` | ||
|
||
#### Using Java 8 Streams with Scala Function Converters | ||
|
||
Scala can emit Java SAMs for lambda expressions that are arguments to methods that take a Java SAM rather than | ||
a Scala Function. However, it can be convenient to restrict the SAM interface to interactions with Java code | ||
(including Java 8 Streams) rather than having it propagate throughout Scala code. | ||
|
||
Using Java 8 Stream converters together with function converters allows one to accomplish this with only a modest | ||
amount of fuss. | ||
|
||
Example: | ||
|
||
```scala | ||
import scala.compat.java8.FunctionConverters._ | ||
import scala.compat.java8.StreamConverters._ | ||
|
||
def mapToSortedString[A](xs: Vector[A], f: A => String, sep: String) = | ||
xs.parStream. // Creates java.util.stream.Stream[String] | ||
map[String](f.asJava).sorted. // Maps A to String and sorts (in parallel) | ||
toArray.mkString(sep) // Back to an Array to use Scala's mkString | ||
``` | ||
|
||
Note that explicit creation of a new lambda will tend to lead to improved type inference and at least equal | ||
performance: | ||
|
||
```scala | ||
def mapToSortedString[A](xs: Vector[A], f: A => String, sep: String) = | ||
xs.parStream. | ||
map[String](a => f(a)).sorted. // Explicit lambda creates a SAM wrapper for f | ||
toArray.mkString(sep) | ||
``` | ||
|
||
#### Java Usage Example | ||
|
||
To convert a Scala collection to a Java 8 Stream from within Java, it usually | ||
suffices to call `ScalaStreaming.from(xs)` on your collection `xs`. If `xs` is | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about calling this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a good idea. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implementation: szeiger@f5988a7 |
||
a map, you may wish to get the keys or values alone by using `fromKeys` or | ||
`fromValues`. If the collection has an underlying representation that is not | ||
efficiently parallelized (e.g. `scala.collection.immutable.List`), then | ||
`fromAccumulated` (and `fromAccumulatedKeys` and `fromAccumulatedValues`) will | ||
first gather the collection into an `Accumulator` and then return a stream over | ||
that accumulator. If not running in parallel, `from` is preferable (faster and | ||
less memory usage). | ||
|
||
Note that a Scala `Iterator` cannot fulfill the contract of a Java 8 Stream | ||
(because it cannot support `trySplit` if it is called). Presently, one must | ||
call `fromAccumulated` on the `Iterator` to cache it, even if the Stream will | ||
be evaluated sequentially, or wrap it as a Java Iterator and use static | ||
methods in `Spliterator` to wrap that as a `Spliterator` and then a `Stream`. | ||
|
||
Here is an example of conversion of a Scala collection within Java 8: | ||
|
||
```java | ||
import scala.collection.mutable.ArrayBuffer; | ||
import scala.compat.java8.ScalaStreaming; | ||
|
||
public class StreamConvertersExample { | ||
public int MakeAndUseArrayBuffer() { | ||
ArrayBuffer<String> ab = new ArrayBuffer<String>(); | ||
ab.$plus$eq("salmon"); | ||
ab.$plus$eq("herring"); | ||
return ScalaStreaming.from(ab).mapToInt(x -> x.length()).sum(); // 6+7 = 13 | ||
} | ||
} | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
# Benchmark suite for Java 8 Streams compatibility layer | ||
|
||
This project is intended to support semi-manual benchmarking of the Java 8 streams compatibility layer in Scala collections. | ||
|
||
Because the benchmarking is **very computationally expensive** it should be done occasionally, not automatically. | ||
|
||
## Code generation step | ||
|
||
1. Run `sbt console` | ||
|
||
2. If the `JmhBench.scala` file already exists, delete it. | ||
|
||
3. Enter `bench.codegen.Generate.jmhBench()` to generate the `JmhBench.scala` file. | ||
|
||
## Benchmarking step | ||
|
||
1. Make sure your terminal has plenty of lines of scrollback. (A couple thousand should do.) | ||
|
||
2. Run `sbt` | ||
|
||
3. Enter `jmh:run -i 5 -wi 3 -f5`. Wait overnight. | ||
|
||
4. Clip off the last set of lines from the terminal window starting before the line that contains `[info] # Run complete. Total time:` and including that line until the end. | ||
|
||
5. Save that in the file `results/jmhbench.log` | ||
|
||
## Comparison step | ||
|
||
1. Run `sbt console` | ||
|
||
2. Enter `bench.examine.SpeedReports()` | ||
|
||
3. Look at the ASCII art results showing speed comparisons. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
enablePlugins(JmhPlugin) | ||
|
||
lazy val root = (project in file(".")).settings( | ||
name := "java8-compat-bench", | ||
scalaVersion := "2.11.7", | ||
crossScalaVersions := List("2.11.7" /* TODO, "2.12.0-M3"*/), | ||
organization := "org.scala-lang.modules", | ||
version := "0.6.0-SNAPSHOT", | ||
unmanagedJars in Compile ++= Seq(baseDirectory.value / "../target/scala-2.11/scala-java8-compat_2.11-0.6.0-SNAPSHOT.jar") | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.5") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if
seq
andpar
in Scala collections should be renamed tosequential
andparallel
. AFAICT they see little use in practice,seq
is confusing (because we also have aSeq
type) and Java now usessequential
andparallel
for streams.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder the same thing, but I don't think it matters much to this API. We can have the names be as above or
sequentialStream
andparallelStream
or whatever regardless.