Skip to content

Compatibility for Java 8 Streams through Steppers, a Spliterator/Iterator hybrid #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 56 commits into from
Mar 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
1562feb
Basic implementation of Stepper which covers functionality of both It…
Ichoran Apr 30, 2015
f476c42
Further work on core stepper API.
Ichoran May 2, 2015
1cf9ea2
Basic conversion of Spliterator to Stepper works.
Ichoran Aug 23, 2015
ec8ebd1
Work in progress on Filter etc..
Ichoran Aug 30, 2015
bc909f4
Refactored names a bit, tried to make interface work a little better.
Ichoran Sep 2, 2015
7b6e118
Added outline of a test for Stepper.
Ichoran Sep 7, 2015
5f7d2d7
Converted to covariant 2nd arg in Stepper, introduced StepperLike.
Ichoran Sep 8, 2015
649dec4
Moved classes outside of StepperTest to see if this helps exception.
Ichoran Sep 8, 2015
e6c8ffa
Fixed TryStepper impl to avoid specialization bug. Added tests.
Ichoran Sep 8, 2015
2550aff
Finished testing terminal operations for Stepper, save spliterator wh…
Ichoran Sep 9, 2015
6ee09ef
Added proxy-to-spliterator to default Stepper trait.
Ichoran Sep 12, 2015
37ea39c
Tests on all the various permutations of Stepper-Spliterator converters.
Ichoran Sep 12, 2015
04bc02a
Fixed a bug in tryStep of TryStepper by adding a protected tryUncache…
Ichoran Sep 12, 2015
e7033e3
Finished tests of basic steppers and fixed bugs in TryStepper.
Ichoran Sep 13, 2015
6baf832
Basic interoperability between Java8 Streams and Scala collections.
Ichoran Jul 13, 2015
302fd15
Pulled out abstractable part of Array steppers for use by any Indexed.
Ichoran Sep 27, 2015
ecf85a3
Abstracted indexed stepping over Double, Int, Long specialized versions.
Ichoran Sep 27, 2015
e562451
IndexedSeqOptimized added to Stream compat via Stepper.
Ichoran Sep 27, 2015
390bbab
FlatHashTable-based collections have steppers and can be converted to…
Ichoran Sep 30, 2015
a482301
Vector works with Stepper, and if you use the raw Stepper interface
Ichoran Oct 4, 2015
4c72bd9
Added a bunch of comprehensiveness tests to StepConvertersTests.
Ichoran Oct 7, 2015
40c0117
Ranges step quickly. Nearly complete coverage testing for generics.
Ichoran Oct 7, 2015
4013be6
Filled in all the stepper tests and simplifed IndexedSeqOptimized dis…
Ichoran Oct 8, 2015
e4c18a3
Simplified StepsLikeIndexed implementation.
Ichoran Oct 8, 2015
27e72aa
Added missing file! Also covered all of IndexedSeqLike.
Ichoran Oct 8, 2015
ffeb369
LinearSeq conversions in.
Ichoran Oct 11, 2015
a221edc
Added iterator interface. Probably need to have it store initial seg…
Ichoran Oct 12, 2015
f9d4e35
Added key and value steppers to maps.
Ichoran Oct 12, 2015
d5bb589
Distinguish between good and sorta acceptable (sequential traversal) …
Ichoran Oct 12, 2015
f13b0f3
Linked and regular HashMaps working.
Ichoran Oct 13, 2015
fedc1eb
Immutable HashMaps are now nicely steppable (and thus can be streamed).
Ichoran Oct 18, 2015
7413c0c
BitSet added.
Ichoran Oct 19, 2015
edf0644
Added marker trait to indicate when splits are efficient.
Ichoran Oct 25, 2015
3f02f54
Modified stream converters to mostly defer to StepConverters.
Ichoran Oct 25, 2015
ace87d4
Finished transition to only allowing parStream on Steppers that suppo…
Ichoran Oct 26, 2015
72fb53d
Fixed up tests after disallowing implicit Accumulator creation.
Ichoran Nov 29, 2015
675f2ed
Factored out MakeXyzStepper traits into `MakesSteppers`.
Ichoran Nov 29, 2015
66756d1
Split out steppers for Array, IndexedSeq, LinearSeq, Range, Vector
Ichoran Nov 29, 2015
2674205
Pulled out more steppers.
Ichoran Nov 29, 2015
4c5f0a1
Moved Iterator and String steppers to their own files.
Ichoran Nov 29, 2015
02d56d5
Moved all value classes out of StepConverters.
Ichoran Nov 29, 2015
c96039d
Made StreamConverters responsible for Steps and Accumulators also.
Ichoran Nov 29, 2015
043672b
Java class full of static methods that give streams from collections.
Ichoran Dec 1, 2015
86370b0
Reorganized namespaces for Stream compatibility.
Ichoran Dec 2, 2015
e77ca87
Adding benchmarking: made generators for collections.
Ichoran Dec 6, 2015
970278c
Create a bunch of classes for benchmarking.
Ichoran Dec 8, 2015
82ec9a5
Finished Java collection stream typeclasses. Writing code generator …
Ichoran Dec 20, 2015
021490b
Got a little more of the code gen framework written.
Ichoran Dec 22, 2015
733a035
Starting to fill in code generators. Have basic correctness test sev…
Ichoran Jan 1, 2016
75a625d
Generator for correctness almost complete.
Ichoran Jan 2, 2016
29b5ad1
All tests for agreement of results pass.
Ichoran Jan 2, 2016
00977b8
Added overlooked Operations file!
Ichoran Jan 2, 2016
8ab0468
JMH benchmarks working, and take less time than forever.
Ichoran Jan 3, 2016
bea1d39
Added docs and benchmark results.
Ichoran Jan 8, 2016
af5de57
Example of Scala collection to Java 8 Stream from within Java.
Ichoran Jan 8, 2016
d3606b7
Describe how to get a Stream from Scala colls in Java.
Ichoran Feb 18, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 131 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,134 @@ class Test {
```


## Future work
- Converters for `java.util.stream`
- [`Spliterator`](https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html)s for Scala collections
## Converters from Scala collections to Java 8 Streams

Scala collections gain `seqStream` and `parStream` as extension methods that produce a Java 8 Stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if seq and par in Scala collections should be renamed to sequential and parallel. AFAICT they see little use in practice, seq is confusing (because we also have a Seq type) and Java now uses sequential and parallel for streams.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder the same thing, but I don't think it matters much to this API. We can have the names be as above or sequentialStream and parallelStream or whatever regardless.

running sequentially or in parallel, respectively. These are automatically specialized to a primitive
type if possible. For instance, `List(1,2).seqStream` produces an `IntStream`. Maps additionally have
`seqKeyStream`, `seqValueStream`, `parKeyStream`, and `parValueStream` methods.

Scala collections also gain `accumulate` and `stepper` methods that produce utility collections that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would you use these methods? It looks like accumulate always copies the collection sequentially, which does not seem useful to me as a separate feature. For sequential streams, I'd expect that I could just use the original collection, and for parallel streams I'd want the implementation to decide whether copying is really necessary or the original Scala parallel collection (or even a sequential collection like ArrayBuffer) can be used efficiently. Building an accumulator sequentially could still be done with to[Accumulator], which would probably set the right expectations for performance characteristics.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You would use this as a high-performance intermediate in stream processing when you need to build something and keep working on it. Internally, I also use it for collections that don't parallelize well. (I intended to only use them when parStream or the like was called, as an up-front sequential step; not sure how far I got in that.) Since you want to use it as an intermediate, you may as well expose it at the very beginning to make processing consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incidentally, you could also just drop all the Accumulator stuff and use arrays instead, if you never needed streams longer than 2G. They're about the same speed. Everything else is slower (all Java or Scala collections I tested).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see why you'd want to use it internally, I'm just concerned about exposing new concepts like Accumulator and Stepper in the API at this point. If there's no compelling use-case I'd just leave them internal and see if a need for more API arises.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adriaan asked me to write a framework for exposing more collections internals so it was easier to do things like write parallelizing code. It seemed that having a Stepper already there would be a benefit. But I agree, it could probably be hidden as an implementation detail (at least with private[scala.compat]).

I'm not so sure it makes sense to hide Accumulator. If you want to safely collect intermediate results of unknown size, it is the thing to use. There are no equivalents.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, for Accumulator I would only argue for removal of the accumulator convenience methods on collections. The type needs to be exposed as part of the API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. I'm not sure inconvenience helps more than simplification, but I don't have a strong feeling either way.

can be useful when working with Java 8 Streams. `accumulate` produces an `Accumulator` or its primitive
counterpart (`DoubleAccumulator`, etc.), which is a low-level collection designed for efficient collection
and dispatching of results to and from Streams. Unlike most collections, it can contain more than
`Int.MaxValue` elements. `stepper` produces a `Stepper` which is a fusion of `Spliterator` and `Iterator`.
`Stepper`s underlie the Scala collections' instances of Java 8 Streams.

Java 8 Streams gain `toScala[Coll]` and `accumulate` methods, to make it easy to produce Scala collections
or Accumulators, respectively, from Java 8 Streams. For instance, `myStream.to[Vector]` will collect the
contents of a Stream into a `scala.collection.immutable.Vector`. Note that standard sequential builders
are used for collections, so this is best done to gather the results of an expensive computation.

Finally, there is a Java class, `ScalaStreamer`, that has a series of `from` methods that can be used to
obtain Java 8 Streams from Scala collections from within Java.

#### Performance Considerations

For sequential operations, Scala's `iterator` almost always equals or exceeds the performance of a Java 8 stream. Thus,
one should favor `iterator` (and its richer set of operations) over `seqStream` for general use. However, long
chains of processing of primitive types can sometimes benefit from the manually specialized methods in `DoubleStream`,
`IntStream`, and `LongStream`.

Note that although `iterator` typically has superior performance in a sequential context, the advantage is modest
(usually less than 50% higher throughput for `iterator`).

For parallel operations, `parStream` and even `seqStream.parallel` meets or exceeds the performance of Scala parallel
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there use cases for having seqStream.parallel be different from parStream? I would expect a sequential stream that was constructed directly from a collection to return that collection's native parallel stream when you call .parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are sure that you're going to support parallel operations, you might want to do different work up front. I don't remember if I actually implemented that, but note that the steppers do not see the .parallel call; all they know is that at some point trySplit gets called on the Spliterator interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. You could override Stream.parallel to make the stepper know but that would only work for the initial stream, so it's probably not worth the extra complexity.

collections methods (invoked with `.par`). Especially for small collections, the difference can be substantial. In
some cases, when a Scala (parallel) collection is the ultimate result, Scala parallel collections can have an advantage
as the collection can (in some cases) be built in parallel.

Because the wrappers are invoked based on the static type of the collection, there are also cases where parallelization
is inefficient when interfacing with Java 8 Streams (e.g. when a collection is typed as `Seq[String]` so might have linear
access like `List`, but actually is a `WrappedArray[String]` that can be efficiently parallelized) but can be efficient
with Scala parallel collections. The `parStream` method is only available when the static type is known to be compatible
with rapid parallel operation; `seqStream` can be parallelized by using `.parallel`, but may or may not be efficient.

If the operations available on Java 8 Streams are sufficient, the collection type is known statically with enough precision
to enable parStream, and an `Accumulator` or non-collection type is an acceptable result, Java 8 Streams will essentially
always outperform the Scala parallel collections.

#### Scala Usage Example

```scala
import scala.compat.java8.StreamConverters._

object Test {
val m = collection.immutable.HashMap("fish" -> 2, "bird" -> 4)
val s = m.parValueStream.sum // 6, potientially computed in parallel
val t = m.seqKeyStream.toScala[List] // List("fish", "bird")
val a = m.accumulate // Accumulator[(String, Int)]

val n = a.stepper.fold(0)(_ + _._1.length) +
a.parStream.count // 8 + 2 = 10

val b = java.util.Arrays.stream(Array(2L, 3L, 4L)).
accumulate // LongAccumulator
val l = b.to[List] // List(2L, 3L, 4L)
}
```

#### Using Java 8 Streams with Scala Function Converters

Scala can emit Java SAMs for lambda expressions that are arguments to methods that take a Java SAM rather than
a Scala Function. However, it can be convenient to restrict the SAM interface to interactions with Java code
(including Java 8 Streams) rather than having it propagate throughout Scala code.

Using Java 8 Stream converters together with function converters allows one to accomplish this with only a modest
amount of fuss.

Example:

```scala
import scala.compat.java8.FunctionConverters._
import scala.compat.java8.StreamConverters._

def mapToSortedString[A](xs: Vector[A], f: A => String, sep: String) =
xs.parStream. // Creates java.util.stream.Stream[String]
map[String](f.asJava).sorted. // Maps A to String and sorts (in parallel)
toArray.mkString(sep) // Back to an Array to use Scala's mkString
```

Note that explicit creation of a new lambda will tend to lead to improved type inference and at least equal
performance:

```scala
def mapToSortedString[A](xs: Vector[A], f: A => String, sep: String) =
xs.parStream.
map[String](a => f(a)).sorted. // Explicit lambda creates a SAM wrapper for f
toArray.mkString(sep)
```

#### Java Usage Example

To convert a Scala collection to a Java 8 Stream from within Java, it usually
suffices to call `ScalaStreaming.from(xs)` on your collection `xs`. If `xs` is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about calling this ScalaStream.of(xs) or ScalaStreamSupport.stream(xs) to stay closer to the Java APIs Stream.of and StreamSupport.stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implementation: szeiger@f5988a7

a map, you may wish to get the keys or values alone by using `fromKeys` or
`fromValues`. If the collection has an underlying representation that is not
efficiently parallelized (e.g. `scala.collection.immutable.List`), then
`fromAccumulated` (and `fromAccumulatedKeys` and `fromAccumulatedValues`) will
first gather the collection into an `Accumulator` and then return a stream over
that accumulator. If not running in parallel, `from` is preferable (faster and
less memory usage).

Note that a Scala `Iterator` cannot fulfill the contract of a Java 8 Stream
(because it cannot support `trySplit` if it is called). Presently, one must
call `fromAccumulated` on the `Iterator` to cache it, even if the Stream will
be evaluated sequentially, or wrap it as a Java Iterator and use static
methods in `Spliterator` to wrap that as a `Spliterator` and then a `Stream`.

Here is an example of conversion of a Scala collection within Java 8:

```java
import scala.collection.mutable.ArrayBuffer;
import scala.compat.java8.ScalaStreaming;

public class StreamConvertersExample {
public int MakeAndUseArrayBuffer() {
ArrayBuffer<String> ab = new ArrayBuffer<String>();
ab.$plus$eq("salmon");
ab.$plus$eq("herring");
return ScalaStreaming.from(ab).mapToInt(x -> x.length()).sum(); // 6+7 = 13
}
}
```
33 changes: 33 additions & 0 deletions benchmark/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Benchmark suite for Java 8 Streams compatibility layer

This project is intended to support semi-manual benchmarking of the Java 8 streams compatibility layer in Scala collections.

Because the benchmarking is **very computationally expensive** it should be done occasionally, not automatically.

## Code generation step

1. Run `sbt console`

2. If the `JmhBench.scala` file already exists, delete it.

3. Enter `bench.codegen.Generate.jmhBench()` to generate the `JmhBench.scala` file.

## Benchmarking step

1. Make sure your terminal has plenty of lines of scrollback. (A couple thousand should do.)

2. Run `sbt`

3. Enter `jmh:run -i 5 -wi 3 -f5`. Wait overnight.

4. Clip off the last set of lines from the terminal window starting before the line that contains `[info] # Run complete. Total time:` and including that line until the end.

5. Save that in the file `results/jmhbench.log`

## Comparison step

1. Run `sbt console`

2. Enter `bench.examine.SpeedReports()`

3. Look at the ASCII art results showing speed comparisons.
10 changes: 10 additions & 0 deletions benchmark/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
enablePlugins(JmhPlugin)

lazy val root = (project in file(".")).settings(
name := "java8-compat-bench",
scalaVersion := "2.11.7",
crossScalaVersions := List("2.11.7" /* TODO, "2.12.0-M3"*/),
organization := "org.scala-lang.modules",
version := "0.6.0-SNAPSHOT",
unmanagedJars in Compile ++= Seq(baseDirectory.value / "../target/scala-2.11/scala-java8-compat_2.11-0.6.0-SNAPSHOT.jar")
)
1 change: 1 addition & 0 deletions benchmark/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.5")
Loading