diff --git a/src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala b/src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala index 5c907e7..1ee934e 100644 --- a/src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala +++ b/src/main/scala/scala/concurrent/java8/FutureConvertersImpl.scala @@ -6,7 +6,7 @@ package scala.concurrent.java8 // Located in this package to access private[concurrent] members import scala.concurrent.{ Future, Promise, ExecutionContext, ExecutionContextExecutorService, ExecutionContextExecutor, impl } -import java.util.concurrent.{ CompletionStage, Executor, ExecutorService, CompletableFuture } +import java.util.concurrent._ import scala.util.{ Try, Success, Failure } import java.util.function.{ BiConsumer, Function ⇒ JF, Consumer, BiFunction } @@ -66,8 +66,21 @@ object FuturesConvertersImpl { cf } - override def toCompletableFuture(): CompletableFuture[T] = - throw new UnsupportedOperationException("this CompletionStage represents a read-only Scala Future") + /** + * @inheritdoc + * + * WARNING: completing the result of this method will not complete the underlying + * Scala Future or Promise (ie, the one that that was passed to `toJava`.) + */ + override def toCompletableFuture(): CompletableFuture[T] = this + + override def obtrudeValue(value: T): Unit = throw new UnsupportedOperationException("obtrudeValue may not be used on the result of toJava(scalaFuture)") + + override def obtrudeException(ex: Throwable): Unit = throw new UnsupportedOperationException("obtrudeException may not be used on the result of toJava(scalaFuture)") + + override def get(): T = scala.concurrent.blocking(super.get()) + + override def get(timeout: Long, unit: TimeUnit): T = scala.concurrent.blocking(super.get(timeout, unit)) override def toString: String = super[CompletableFuture].toString } diff --git a/src/test/java/scala/compat/java8/FutureConvertersTest.java b/src/test/java/scala/compat/java8/FutureConvertersTest.java index a8d4861..7735aba 100644 --- a/src/test/java/scala/compat/java8/FutureConvertersTest.java +++ b/src/test/java/scala/compat/java8/FutureConvertersTest.java @@ -7,11 +7,9 @@ import scala.concurrent.Future; import scala.concurrent.Promise; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.*; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.*; import static scala.compat.java8.FutureConverters.*; @@ -318,4 +316,86 @@ public void testToJavaExceptionally() throws InterruptedException, latch.countDown(); assertEquals("Hello", second.toCompletableFuture().get()); } + + @Test + public void testToJavaThenComposeWithToJavaThenAccept() throws InterruptedException, + ExecutionException, TimeoutException { + // Test case from https://github.com/scala/scala-java8-compat/issues/29 + final Promise p1 = promise(); + final CompletableFuture future = new CompletableFuture<>(); + + CompletableFuture.supplyAsync(() -> "Hello"). + thenCompose(x -> toJava(p1.future())).handle((x, t) -> future.complete(x)); + p1.success("Hello"); + assertEquals("Hello", future.get(1000, MILLISECONDS)); + } + + @Test + public void testToJavaToCompletableFuture() throws ExecutionException, InterruptedException { + final Promise p = promise(); + final CompletionStage cs = toJava(p.future()); + CompletableFuture cf = cs.toCompletableFuture(); + assertEquals("notyet", cf.getNow("notyet")); + p.success("done"); + assertEquals("done", cf.get()); + } + + @Test + public void testToJavaToCompletableFutureDoesNotMutateUnderlyingPromise() throws ExecutionException, InterruptedException { + final Promise p = promise(); + Future sf = p.future(); + final CompletionStage cs = toJava(sf); + CompletableFuture cf = cs.toCompletableFuture(); + assertEquals("notyet", cf.getNow("notyet")); + cf.complete("done"); + assertEquals("done", cf.get()); + assertFalse(sf.isCompleted()); + assertFalse(p.isCompleted()); + } + + @Test + public void testToJavaToCompletableFutureJavaCompleteCalledAfterScalaComplete() throws ExecutionException, InterruptedException { + final Promise p = promise(); + Future sf = p.future(); + final CompletionStage cs = toJava(sf); + CompletableFuture cf = cs.toCompletableFuture(); + assertEquals("notyet", cf.getNow("notyet")); + p.success("scaladone"); + assertEquals("scaladone", cf.get()); + cf.complete("javadone"); + assertEquals("scaladone", cf.get()); + } + + @Test + public void testToJavaToCompletableFutureJavaCompleteCalledBeforeScalaComplete() throws ExecutionException, InterruptedException { + final Promise p = promise(); + Future sf = p.future(); + final CompletionStage cs = toJava(sf); + CompletableFuture cf = cs.toCompletableFuture(); + assertEquals("notyet", cf.getNow("notyet")); + cf.complete("javadone"); + assertEquals("javadone", cf.get()); + p.success("scaladone"); + assertEquals("javadone", cf.get()); + } + + @Test + public void testToJavaToCompletableFutureJavaObtrudeCalledBeforeScalaComplete() throws ExecutionException, InterruptedException { + final Promise p = promise(); + Future sf = p.future(); + final CompletionStage cs = toJava(sf); + CompletableFuture cf = cs.toCompletableFuture(); + try { + cf.obtrudeValue(""); + fail(); + } catch (UnsupportedOperationException iae) { + // okay + } + try { + cf.obtrudeException(new Exception()); + fail(); + } catch (UnsupportedOperationException iae) { + // okay + } + } }