Skip to content

Commit 043672b

Browse files
committed
Java class full of static methods that give streams from collections.
1 parent c96039d commit 043672b

File tree

10 files changed

+1068
-8
lines changed

10 files changed

+1068
-8
lines changed

README.md

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,49 @@ class Test {
117117
```
118118

119119

120-
## Future work
121-
- Converters for `java.util.stream`
122-
- [`Spliterator`](https://docs.oracle.com/javase/8/docs/api/java/util/Spliterator.html)s for Scala collections
120+
## Converters from Scala collections to Java 8 Streams
121+
122+
Scala collections gain `seqStream` and `parStream` as extension methods that produce a Java 8 Stream
123+
running sequentially or in parallel, respectively. These are automatically specialized to a primitive
124+
type if possible. For instance, `List(1,2).seqStream` produces an `IntStream`. Maps additionally have
125+
`seqKeyStream`, `seqValueStream`, `parKeyStream`, and `parValueStream` methods.
126+
127+
Scala collections also gain `accumulate` and `stepper` methods that produce utility collections that
128+
can be useful when working with Java 8 Streams. `accumulate` produces an `Accumulator` or its primitive
129+
counterpart (`DoubleAccumulator`, etc.), which is a low-level collection designed for efficient collection
130+
and dispatching of results to and from Streams. Unlike most collections, it can contain more than
131+
`Int.MaxValue` elements. `stepper` produces a `Stepper` which is a fusion of `Spliterator` and `Iterator`.
132+
`Stepper`s underlie the Scala collections' instances of Java 8 Streams.
133+
134+
Java 8 Streams gain `toScala[Coll]` and `accumulate` methods, to make it easy to produce Scala collections
135+
or Accumulators, respectively, from Java 8 Streams. For instance, `myStream.to[Vector]` will collect the
136+
contents of a Stream into a `scala.collection.immutable.Vector`. Note that standard sequential builders
137+
are used for collections, so this is best done to gather the results of an expensive computation.
138+
139+
Finally, there is a Java class, `ScalaStreamer`, that has a series of `from` methods that can be used to
140+
obtain Java 8 Streams from Scala collections from within Java.
141+
142+
#### Scala Usage Example
143+
144+
```scala
145+
import scala.compat.java8.StreamConverters._
146+
147+
object Test {
148+
val m = collection.immutable.HashMap("fish" -> 2, "bird" -> 4)
149+
val s = m.parValueStream.sum // 6, potientially computed in parallel
150+
val t = m.seqKeyStream.toScala[List] // List("fish", "bird")
151+
val a = t.accumulate // Accumulator[(String, Int)]
152+
153+
val n = a.splitter.fold(0)(_ + _._1.length) +
154+
a.parStream.count // 8 + 2 = 10
155+
156+
val b = java.util.Arrays.stream(Array(2L, 3L, 4L)).
157+
accumulate // LongAccumulator
158+
}
159+
```
160+
161+
#### Java Usage Example
162+
163+
```java
164+
// TODO -- write converter and create example
165+
```

src/main/java/scala/compat/java8/ScalaStreaming.java

Lines changed: 904 additions & 0 deletions
Large diffs are not rendered by default.

src/main/scala/scala/compat/java8/StreamConverters.scala

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,10 @@ trait Priority1StreamConverters extends Priority2StreamConverters {
137137
*
138138
* Scala collections gain extension methods `seqStream` and
139139
* `parStream` that allow them to be used as the source of a `Stream`.
140+
* Some collections either intrinsically cannot be paralellized, or
141+
* could be but an efficient implementation is missing. It this case,
142+
* only `seqStream` is provided. If a collection cannot be stepped over
143+
* at all (e.g. `Traversable`), then it gains neither method.
140144
*
141145
* `Array` also gains `seqStream` and `parStream` methods, and calling those
142146
* on `Array[Double]`, `Array[Int]`, or `Array[Long]` will produce the
@@ -152,10 +156,32 @@ trait Priority1StreamConverters extends Priority2StreamConverters {
152156
* have custom accumulators with improved performance.
153157
*
154158
* Accumulators have `toArray`, `toList`, `iterator`, and `to[_]` methods
155-
* to convert to standard Scala collections.
159+
* to convert to standard Scala collections. Note that if you wish to
160+
* create an array from a `Stream`, going through an `Accumulator` is
161+
* not the most efficient option: just create the `Array` directly.
156162
*
157-
* Example:
158-
* ```
163+
* Internally, Scala collections implement a hybrid of `Iterator` and
164+
* `java.util.Spliterator` to implement `Stream` compatibility; these
165+
* are called `Stepper`s. In particular, they can test for the presence
166+
* of a next element using `hasStep`, can retrieve the next value with
167+
* `nextStep`, or can optionally retrieve and operate on a value if present
168+
* with `tryStep`, which works like `tryAdvance` in `java.util.Spliterator`.
169+
*
170+
* Every Scala collection that can be stepped
171+
* through has a `stepper` method implicitly provided. In addition,
172+
* maps have `keyStepper` and `valueStepper` methods. A limited number
173+
* of collections operations are defined on `Stepper`s, including conversion
174+
* to Scala collections with `to` or accumulation via `accumulate`.
175+
* `Stepper`s also implement `seqStream` and `parStream` to generate `Stream`s.
176+
* These are provided regardless of whether a `Stepper` can efficiently
177+
* subdivide itself for parallel processing (though one can check for the
178+
* presence of the `EfficientSubstep` trait to know that parallel execution will
179+
* not be limited by long sequential searching steps, and one can call
180+
* `anticipateParallelism` to warn a `Stepper` that it will be used in a parallel
181+
* context and thus may wish to make different tradeoffs).
182+
*
183+
* Examples:
184+
* {{{
159185
* import scala.compat.java8.StreamConverers._
160186
*
161187
* val s = Vector(1,2,3,4).parStream // Stream[Int]
@@ -165,7 +191,9 @@ trait Priority1StreamConverters extends Priority2StreamConverters {
165191
*
166192
* val t = Array(2.0, 3.0, 4.0).parStream // DoubleStream
167193
* val q = t.toScala[scala.collection.immutable.Queue] // Queue[Double]
168-
* ```
194+
*
195+
* val x = List(1L, 2L, 3L, 4L).stepper.parStream.sum // 10, potentially computed in parallel
196+
* }}}
169197
*/
170198
object StreamConverters
171199
extends Priority1StreamConverters
@@ -273,4 +301,28 @@ with converterImpls.Priority1AccumulatorConverters
273301
}
274302
}
275303
}
304+
305+
implicit val accumulateDoubleStepper = new AccumulatesFromStepper[Double, DoubleAccumulator] {
306+
def apply(stepper: Stepper[Double]) = {
307+
val a = new DoubleAccumulator
308+
while (stepper.hasStep) a += stepper.nextStep
309+
a
310+
}
311+
}
312+
313+
implicit val accumulateIntStepper = new AccumulatesFromStepper[Int, IntAccumulator] {
314+
def apply(stepper: Stepper[Int]) = {
315+
val a = new IntAccumulator
316+
while (stepper.hasStep) a += stepper.nextStep
317+
a
318+
}
319+
}
320+
321+
implicit val accumulateLongStepper = new AccumulatesFromStepper[Long, LongAccumulator] {
322+
def apply(stepper: Stepper[Long]) = {
323+
val a = new LongAccumulator
324+
while (stepper.hasStep) a += stepper.nextStep
325+
a
326+
}
327+
}
276328
}

src/main/scala/scala/compat/java8/collectionImpl/Accumulates.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ import language.implicitConversions
55
import scala.compat.java8.collectionImpl._
66
import scala.compat.java8.runtime._
77

8+
trait AccumulatesFromStepper[@specialized(Double, Int, Long) A, Acc <: AccumulatorLike[A, Acc]] {
9+
def apply(stepper: Stepper[A]): Acc
10+
}
11+
812
final class CollectionCanAccumulate[A](private val underlying: TraversableOnce[A]) extends AnyVal {
913
def accumulate: Accumulator[A] = {
1014
val a = new Accumulator[A]

src/main/scala/scala/compat/java8/collectionImpl/Accumulator.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,13 @@ object Accumulator {
214214

215215
/** A `BiConsumer` that merges `Accumulator`s, suitable for use with `java.util.stream.Stream`'s `collect` method. */
216216
def merger[A] = new java.util.function.BiConsumer[Accumulator[A], Accumulator[A]]{ def accept(a1: Accumulator[A], a2: Accumulator[A]) { a1 drain a2 } }
217+
218+
/** Builds an `Accumulator` from any `TraversableOnce` */
219+
def from[A](source: TraversableOnce[A]) = {
220+
val a = new Accumulator[A]
221+
source.foreach(a += _)
222+
a
223+
}
217224
}
218225

219226
private[java8] class AccumulatorStepper[A](private val acc: Accumulator[A]) extends AnyStepper[A] {

src/main/scala/scala/compat/java8/collectionImpl/AccumulatorConverters.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package scala.compat.java8.converterImpls
22

33
import language.implicitConversions
44

5+
import scala.compat.java8.collectionImpl._
6+
57
trait Priority3AccumulatorConverters {
68
implicit def collectionCanAccumulate[A](underlying: TraversableOnce[A]) = new CollectionCanAccumulate[A](underlying)
79
}
@@ -17,4 +19,17 @@ trait Priority1AccumulatorConverters extends Priority2AccumulatorConverters {
1719
implicit def accumulateDoubleArray(underlying: Array[Double]) = new AccumulateDoubleArray(underlying)
1820
implicit def accumulateIntArray(underlying: Array[Int]) = new AccumulateIntArray(underlying)
1921
implicit def accumulateLongArray(underlying: Array[Long]) = new AccumulateLongArray(underlying)
22+
23+
implicit def accumulateAnyStepper[A]: AccumulatesFromStepper[A, Accumulator[A]] =
24+
PrivateAccumulatorConverters.genericAccumulateAnyStepper.asInstanceOf[AccumulatesFromStepper[A, Accumulator[A]]]
25+
}
26+
27+
private[java8] object PrivateAccumulatorConverters {
28+
val genericAccumulateAnyStepper: AccumulatesFromStepper[Any, Accumulator[Any]] = new AccumulatesFromStepper[Any, Accumulator[Any]] {
29+
def apply(stepper: Stepper[Any]) = {
30+
val a = new Accumulator[Any]
31+
while (stepper.hasStep) a += stepper.nextStep
32+
a
33+
}
34+
}
2035
}

src/main/scala/scala/compat/java8/collectionImpl/DoubleAccumulator.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,13 @@ object DoubleAccumulator {
211211

212212
/** A `BiConsumer` that merges `DoubleAccumulator`s, suitable for use with `java.util.stream.DoubleStream`'s `collect` method. Suitable for `Stream[Double]` also. */
213213
def merger = new java.util.function.BiConsumer[DoubleAccumulator, DoubleAccumulator]{ def accept(a1: DoubleAccumulator, a2: DoubleAccumulator) { a1 drain a2 } }
214+
215+
/** Builds a `DoubleAccumulator` from any `Double`-valued `TraversableOnce` */
216+
def from[A](source: TraversableOnce[Double]) = {
217+
val a = new DoubleAccumulator
218+
source.foreach(a += _)
219+
a
220+
}
214221
}
215222

216223
private[java8] class DoubleAccumulatorStepper(private val acc: DoubleAccumulator) extends DoubleStepper {

src/main/scala/scala/compat/java8/collectionImpl/IntAccumulator.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,13 @@ object IntAccumulator {
218218

219219
/** A `BiConsumer` that merges `IntAccumulator`s, suitable for use with `java.util.stream.IntStream`'s `collect` method. Suitable for `Stream[Int]` also. */
220220
def merger = new java.util.function.BiConsumer[IntAccumulator, IntAccumulator]{ def accept(a1: IntAccumulator, a2: IntAccumulator) { a1 drain a2 } }
221+
222+
/** Builds an `IntAccumulator` from any `Int`-valued `TraversableOnce` */
223+
def from[A](source: TraversableOnce[Int]) = {
224+
val a = new IntAccumulator
225+
source.foreach(a += _)
226+
a
227+
}
221228
}
222229

223230
private[java8] class IntAccumulatorStepper(private val acc: IntAccumulator) extends IntStepper {

src/main/scala/scala/compat/java8/collectionImpl/LongAccumulator.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,13 @@ object LongAccumulator {
212212

213213
/** A `BiConsumer` that merges `LongAccumulator`s, suitable for use with `java.util.stream.LongStream`'s `collect` method. Suitable for `Stream[Long]` also. */
214214
def merger = new java.util.function.BiConsumer[LongAccumulator, LongAccumulator]{ def accept(a1: LongAccumulator, a2: LongAccumulator) { a1 drain a2 } }
215+
216+
/** Builds a `LongAccumulator` from any `Long`-valued `TraversableOnce` */
217+
def from[A](source: TraversableOnce[Long]) = {
218+
val a = new LongAccumulator
219+
source.foreach(a += _)
220+
a
221+
}
215222
}
216223

217224
private[java8] class LongAccumulatorStepper(private val acc: LongAccumulator) extends LongStepper {

src/main/scala/scala/compat/java8/collectionImpl/Stepper.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,14 @@ import java.util.Spliterator
3636
* println(s.hasStep) // Prints `false`
3737
* }}}
3838
*/
39-
trait Stepper[@specialized(Double, Int, Long) A] extends StepperLike[A, Stepper[A]] {}
39+
trait Stepper[@specialized(Double, Int, Long) A] extends StepperLike[A, Stepper[A]] {
40+
/** Drains the contents of this stepper into an `Accumulator` or specialized variant thereof as appropriate.
41+
* This is a terminal operation.
42+
*
43+
* Note: accumulation will occur sequentially. To accumulate in parallel, use a `Stream` (i.e. `.parStream.accumulate`).
44+
*/
45+
def accumulate[Acc <: AccumulatorLike[A, Acc]](implicit accer: scala.compat.java8.converterImpls.AccumulatesFromStepper[A, Acc]) = accer(this)
46+
}
4047

4148
/** An (optional) marker trait that indicates that a `Stepper` can call `substep` with
4249
* at worst O(log N) time and space complexity, and that the division is likely to
@@ -159,6 +166,13 @@ trait StepperLike[@specialized(Double, Int, Long) A, +CC] { self =>
159166
def hasNext = self.hasStep
160167
def next = self.nextStep
161168
}
169+
170+
/** Returns a Scala collection of the type requested. */
171+
def to[Coll[_]](implicit cbf: collection.generic.CanBuildFrom[Nothing, A, Coll[A]]): Coll[A] = {
172+
val b = cbf()
173+
while (hasStep) b += nextStep
174+
b.result()
175+
}
162176
}
163177

164178

0 commit comments

Comments
 (0)