Skip to content

StreamConverters.toScala in 2.12-style #147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 108 additions & 33 deletions src/main/scala/scala/compat/java8/StreamConverters.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,16 @@ trait StreamExtensions {
// toScala for streams

implicit class StreamHasToScala[A](stream: Stream[A]) {
def accumulate: AnyAccumulator[A] = toScala(Accumulator)
def accumulate: AnyAccumulator[A] = toScalaFactory(Accumulator)


/**
* Copy the elements of this stream into a Scala collection.
*
* Converting a parallel streams to an [[Accumulator]] using `stream.toScala(Accumulator)`
* Converting a parallel streams to an [[Accumulator]] using `stream.toScalaFactory(Accumulator)`
* builds the result in parallel.
*
* A `toScala(Accumulator)` call automatically converts streams of boxed integers, longs or
* A `toScalaFactory(Accumulator)` call automatically converts streams of boxed integers, longs or
* doubles are converted to the primitive accumulators ([[IntAccumulator]], etc.).
*
* When converting a parallel stream to a different Scala collection, the stream is first
Expand All @@ -230,57 +230,75 @@ trait StreamExtensions {
* Sequential streams are directly converted to the target collection. If the target collection
* is lazy, the conversion is lazy as well.
*/
def toScala[C1](factory: collection.Factory[A, C1])(implicit info: AccumulatorFactoryInfo[A, C1]): C1 = {

private[java8] def toScalaFactory[C](factory: collection.Factory[A, C])(implicit info: AccumulatorFactoryInfo[A, C]): C = {
def anyAcc = stream.collect(AnyAccumulator.supplier[A], AnyAccumulator.adder[A], AnyAccumulator.merger[A])
if (info.companion == AnyAccumulator) anyAcc.asInstanceOf[C1]
else if (info.companion == IntAccumulator) stream.asInstanceOf[Stream[Int]].collect(IntAccumulator.supplier, IntAccumulator.boxedAdder, IntAccumulator.merger).asInstanceOf[C1]
else if (info.companion == LongAccumulator) stream.asInstanceOf[Stream[Long]].collect(LongAccumulator.supplier, LongAccumulator.boxedAdder, LongAccumulator.merger).asInstanceOf[C1]
else if (info.companion == DoubleAccumulator) stream.asInstanceOf[Stream[Double]].collect(DoubleAccumulator.supplier, DoubleAccumulator.boxedAdder, DoubleAccumulator.merger).asInstanceOf[C1]
if (info.companion == AnyAccumulator) anyAcc.asInstanceOf[C]
else if (info.companion == IntAccumulator) stream.asInstanceOf[Stream[Int]].collect(IntAccumulator.supplier, IntAccumulator.boxedAdder, IntAccumulator.merger).asInstanceOf[C]
else if (info.companion == LongAccumulator) stream.asInstanceOf[Stream[Long]].collect(LongAccumulator.supplier, LongAccumulator.boxedAdder, LongAccumulator.merger).asInstanceOf[C]
else if (info.companion == DoubleAccumulator) stream.asInstanceOf[Stream[Double]].collect(DoubleAccumulator.supplier, DoubleAccumulator.boxedAdder, DoubleAccumulator.merger).asInstanceOf[C]
else if (stream.isParallel) anyAcc.to(factory)
else factory.fromSpecific(stream.iterator.asScala)
}

/**
* Copy the elements of this stream into a Scala collection.
*
* For parallel streams, using [[accumulate]] is recommended as it builds the [[Accumulator]]
* in parallel.
*
* When converting a parallel stream to a different Scala collection, the stream is first
* converted into an [[Accumulator]], which supports parallel building. The accumulator is
* then converted to the target collection. Note that the stream is processed eagerly while
* building the accumulator, even if the target collection is lazy.
*
* Sequential streams are directly converted to the target collection. If the target collection
* is lazy, the conversion is lazy as well.
*/
def toScala[CC[_]](implicit factory: collection.Factory[A, CC[A]]): CC[A] = {
if (stream.isParallel) toScalaFactory(Accumulator).to(factory)
else factory.fromSpecific(stream.iterator.asScala)
}

/** Convert a generic Java Stream wrapping a primitive type to a corresponding primitive
* Stream.
*/
def unboxed[S](implicit unboxer: StreamUnboxer[A, S]): S = unboxer(stream)
}

implicit class StreamIntHasAccumulatePrimitive(s: Stream[Int]) {
def accumulatePrimitive: IntAccumulator = s.toScala(Accumulator)
def accumulatePrimitive: IntAccumulator = s.toScalaFactory(Accumulator)
}

implicit class StreamLongHasAccumulatePrimitive(s: Stream[Long]) {
def accumulatePrimitive: LongAccumulator = s.toScala(Accumulator)
def accumulatePrimitive: LongAccumulator = s.toScalaFactory(Accumulator)
}

implicit class StreamDoubleHasAccumulatePrimitive(s: Stream[Double]) {
def accumulatePrimitive: DoubleAccumulator = s.toScala(Accumulator)
def accumulatePrimitive: DoubleAccumulator = s.toScalaFactory(Accumulator)
}

implicit class StreamJIntegerHasAccumulatePrimitive(s: Stream[java.lang.Integer]) {
def accumulatePrimitive: IntAccumulator = s.toScala(Accumulator)
def accumulatePrimitive: IntAccumulator = s.toScalaFactory(Accumulator)
}

implicit class StreamJLongHasAccumulatePrimitive(s: Stream[java.lang.Long]) {
def accumulatePrimitive: LongAccumulator = s.toScala(Accumulator)
def accumulatePrimitive: LongAccumulator = s.toScalaFactory(Accumulator)
}

implicit class StreamJDoubleHasAccumulatePrimitive(s: Stream[java.lang.Double]) {
def accumulatePrimitive: DoubleAccumulator = s.toScala(Accumulator)
def accumulatePrimitive: DoubleAccumulator = s.toScalaFactory(Accumulator)
}

implicit class IntStreamHasToScala(stream: IntStream) {
def accumulate: IntAccumulator = toScala(IntAccumulator)
def accumulate: IntAccumulator = toScalaFactory(IntAccumulator)

/**
* Copy the elements of this stream into a Scala collection.
*
* Converting a parallel streams to an [[Accumulator]] using `stream.toScala(Accumulator)`
* Converting a parallel streams to an [[Accumulator]] using `stream.toScalaFactory(Accumulator)`
* builds the result in parallel.
*
* A `toScala(Accumulator)` call automatically converts the `IntStream` to a primitive
* A `toScalaFactory(Accumulator)` call automatically converts the `IntStream` to a primitive
* [[IntAccumulator]].
*
* When converting a parallel stream to a different Scala collection, the stream is first
Expand All @@ -291,25 +309,44 @@ trait StreamExtensions {
* Sequential streams are directly converted to the target collection. If the target collection
* is lazy, the conversion is lazy as well.
*/
def toScala[C1](factory: collection.Factory[Int, C1])(implicit info: AccumulatorFactoryInfo[Int, C1]): C1 = {
private[java8] def toScalaFactory[C](factory: collection.Factory[Int, C])(implicit info: AccumulatorFactoryInfo[Int, C]): C = {
def intAcc = stream.collect(IntAccumulator.supplier, IntAccumulator.adder, IntAccumulator.merger)
if (info.companion == AnyAccumulator) stream.collect(AnyAccumulator.supplier[Int], AnyAccumulator.unboxedIntAdder, AnyAccumulator.merger[Int]).asInstanceOf[C1]
else if (info.companion == IntAccumulator) intAcc.asInstanceOf[C1]
if (info.companion == AnyAccumulator) stream.collect(AnyAccumulator.supplier[Int], AnyAccumulator.unboxedIntAdder, AnyAccumulator.merger[Int]).asInstanceOf[C]
else if (info.companion == IntAccumulator) intAcc.asInstanceOf[C]
else if (stream.isParallel) intAcc.to(factory)
else factory.fromSpecific(stream.iterator.asInstanceOf[java.util.Iterator[Int]].asScala)
}

/**
* Copy the elements of this stream into a Scala collection.
*
* For parallel streams, using [[accumulate]] is recommended as it builds the [[IntAccumulator]]
* in parallel.
*
* When converting a parallel stream to a different Scala collection, the stream is first
* converted into an [[Accumulator]], which supports parallel building. The accumulator is
* then converted to the target collection. Note that the stream is processed eagerly while
* building the accumulator, even if the target collection is lazy.
*
* Sequential streams are directly converted to the target collection. If the target collection
* is lazy, the conversion is lazy as well.
*/
def toScala[CC[_]](implicit factory: collection.Factory[Int, CC[Int]]): CC[Int] = {
if (stream.isParallel) toScalaFactory(IntAccumulator).to(factory)
else factory.fromSpecific(stream.iterator.asInstanceOf[java.util.Iterator[Int]].asScala)
}
}

implicit class LongStreamHasToScala(stream: LongStream) {
def accumulate: LongAccumulator = toScala(LongAccumulator)
def accumulate: LongAccumulator = toScalaFactory(LongAccumulator)

/**
* Copy the elements of this stream into a Scala collection.
*
* Converting a parallel streams to an [[Accumulator]] using `stream.toScala(Accumulator)`
* Converting a parallel streams to an [[Accumulator]] using `stream.toScalaFactory(Accumulator)`
* builds the result in parallel.
*
* A `toScala(Accumulator)` call automatically converts the `LongStream` to a primitive
* A `toScalaFactory(Accumulator)` call automatically converts the `LongStream` to a primitive
* [[LongAccumulator]].
*
* When converting a parallel stream to a different Scala collection, the stream is first
Expand All @@ -320,25 +357,44 @@ trait StreamExtensions {
* Sequential streams are directly converted to the target collection. If the target collection
* is lazy, the conversion is lazy as well.
*/
def toScala[C1](factory: collection.Factory[Long, C1])(implicit info: AccumulatorFactoryInfo[Long, C1]): C1 = {
private[java8] def toScalaFactory[C](factory: collection.Factory[Long, C])(implicit info: AccumulatorFactoryInfo[Long, C]): C = {
def longAcc = stream.collect(LongAccumulator.supplier, LongAccumulator.adder, LongAccumulator.merger)
if (info.companion == AnyAccumulator) stream.collect(AnyAccumulator.supplier[Long], AnyAccumulator.unboxedLongAdder, AnyAccumulator.merger[Long]).asInstanceOf[C1]
else if (info.companion == LongAccumulator) longAcc.asInstanceOf[C1]
if (info.companion == AnyAccumulator) stream.collect(AnyAccumulator.supplier[Long], AnyAccumulator.unboxedLongAdder, AnyAccumulator.merger[Long]).asInstanceOf[C]
else if (info.companion == LongAccumulator) longAcc.asInstanceOf[C]
else if (stream.isParallel) longAcc.to(factory)
else factory.fromSpecific(stream.iterator.asInstanceOf[java.util.Iterator[Long]].asScala)
}

/**
* Copy the elements of this stream into a Scala collection.
*
* For parallel streams, using [[accumulate]] is recommended as it builds the [[LongAccumulator]]
* in parallel.
*
* When converting a parallel stream to a different Scala collection, the stream is first
* converted into an [[Accumulator]], which supports parallel building. The accumulator is
* then converted to the target collection. Note that the stream is processed eagerly while
* building the accumulator, even if the target collection is lazy.
*
* Sequential streams are directly converted to the target collection. If the target collection
* is lazy, the conversion is lazy as well.
*/
def toScala[CC[_]](implicit factory: collection.Factory[Long, CC[Long]]): CC[Long] = {
if (stream.isParallel) toScalaFactory(LongAccumulator).to(factory)
else factory.fromSpecific(stream.iterator.asInstanceOf[java.util.Iterator[Long]].asScala)
}
}

implicit class DoubleStreamHasToScala(stream: DoubleStream) {
def accumulate: DoubleAccumulator = toScala(DoubleAccumulator)
def accumulate: DoubleAccumulator = toScalaFactory(DoubleAccumulator)

/**
* Copy the elements of this stream into a Scala collection.
*
* Converting a parallel streams to an [[Accumulator]] using `stream.toScala(Accumulator)`
* Converting a parallel streams to an [[Accumulator]] using `stream.toScalaFactory(Accumulator)`
* builds the result in parallel.
*
* A `toScala(Accumulator)` call automatically converts the `DoubleStream` to a primitive
* A `toScalaFactory(Accumulator)` call automatically converts the `DoubleStream` to a primitive
* [[DoubleAccumulator]].
*
* When converting a parallel stream to a different Scala collection, the stream is first
Expand All @@ -349,13 +405,32 @@ trait StreamExtensions {
* Sequential streams are directly converted to the target collection. If the target collection
* is lazy, the conversion is lazy as well.
*/
def toScala[C1](factory: collection.Factory[Double, C1])(implicit info: AccumulatorFactoryInfo[Double, C1]): C1 = {
private[java8] def toScalaFactory[C](factory: collection.Factory[Double, C])(implicit info: AccumulatorFactoryInfo[Double, C]): C = {
def doubleAcc = stream.collect(DoubleAccumulator.supplier, DoubleAccumulator.adder, DoubleAccumulator.merger)
if (info.companion == AnyAccumulator) stream.collect(AnyAccumulator.supplier[Double], AnyAccumulator.unboxedDoubleAdder, AnyAccumulator.merger[Double]).asInstanceOf[C1]
else if (info.companion == DoubleAccumulator) doubleAcc.asInstanceOf[C1]
if (info.companion == AnyAccumulator) stream.collect(AnyAccumulator.supplier[Double], AnyAccumulator.unboxedDoubleAdder, AnyAccumulator.merger[Double]).asInstanceOf[C]
else if (info.companion == DoubleAccumulator) doubleAcc.asInstanceOf[C]
else if (stream.isParallel) doubleAcc.to(factory)
else factory.fromSpecific(stream.iterator.asInstanceOf[java.util.Iterator[Double]].asScala)
}

/**
* Copy the elements of this stream into a Scala collection.
*
* For parallel streams, using [[accumulate]] is recommended as it builds the [[DoubleAccumulator]]
* in parallel.
*
* When converting a parallel stream to a different Scala collection, the stream is first
* converted into an [[Accumulator]], which supports parallel building. The accumulator is
* then converted to the target collection. Note that the stream is processed eagerly while
* building the accumulator, even if the target collection is lazy.
*
* Sequential streams are directly converted to the target collection. If the target collection
* is lazy, the conversion is lazy as well.
*/
def toScala[CC[_]](implicit factory: collection.Factory[Double, CC[Double]]): CC[Double] = {
if (stream.isParallel) toScalaFactory(DoubleAccumulator).to(factory)
else factory.fromSpecific(stream.iterator.asInstanceOf[java.util.Iterator[Double]].asScala)
}
}
}

Expand Down
Loading