From 56ae6695d47360ed4b1eaccd6e000c8b41a10fe3 Mon Sep 17 00:00:00 2001 From: Sebastien Deleuze Date: Fri, 5 Apr 2019 16:17:46 +0200 Subject: [PATCH] Add RowsFetchSpec.allAsFlow() extension --- .../r2dbc/function/RowsFetchSpecExtensions.kt | 15 +++++++++++++-- .../function/RowsFetchSpecExtensionsTests.kt | 19 +++++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/org/springframework/data/r2dbc/function/RowsFetchSpecExtensions.kt b/src/main/kotlin/org/springframework/data/r2dbc/function/RowsFetchSpecExtensions.kt index 34a09aef..dc50f26a 100644 --- a/src/main/kotlin/org/springframework/data/r2dbc/function/RowsFetchSpecExtensions.kt +++ b/src/main/kotlin/org/springframework/data/r2dbc/function/RowsFetchSpecExtensions.kt @@ -15,8 +15,11 @@ */ package org.springframework.data.r2dbc.function +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.reactive.awaitFirstOrNull import kotlinx.coroutines.reactive.awaitSingle +import kotlinx.coroutines.reactive.flow.asFlow /** * Non-nullable Coroutines variant of [RowsFetchSpec.one]. @@ -50,5 +53,13 @@ suspend fun RowsFetchSpec.awaitFirst(): T = suspend fun RowsFetchSpec.awaitFirstOrNull(): T? = first().awaitFirstOrNull() -// TODO Coroutines variant of [RowsFetchSpec.all], depends on [kotlinx.coroutines#254](https://github.com/Kotlin/kotlinx.coroutines/issues/254). -// suspend fun RowsFetchSpec.awaitAll() = all()... +/** + * Coroutines [Flow] variant of [RowsFetchSpec.all]. + * + * Backpressure is controlled by [batchSize] parameter that controls the size of in-flight elements + * and [org.reactivestreams.Subscription.request] size. + * + * @author Sebastien Deleuze + */ +@FlowPreview +fun RowsFetchSpec.allAsFlow(batchSize: Int = 1): Flow = all().asFlow(batchSize) diff --git a/src/test/kotlin/org/springframework/data/r2dbc/function/RowsFetchSpecExtensionsTests.kt b/src/test/kotlin/org/springframework/data/r2dbc/function/RowsFetchSpecExtensionsTests.kt index da60d9a4..01d55e6e 100644 --- a/src/test/kotlin/org/springframework/data/r2dbc/function/RowsFetchSpecExtensionsTests.kt +++ b/src/test/kotlin/org/springframework/data/r2dbc/function/RowsFetchSpecExtensionsTests.kt @@ -18,10 +18,13 @@ package org.springframework.data.r2dbc.function import io.mockk.every import io.mockk.mockk import io.mockk.verify +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.toList import kotlinx.coroutines.runBlocking import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.junit.Test +import reactor.core.publisher.Flux import reactor.core.publisher.Mono /** @@ -150,4 +153,20 @@ class RowsFetchSpecExtensionsTests { spec.first() } } + + @Test + @FlowPreview + fun allAsFlow() { + + val spec = mockk>() + every { spec.all() } returns Flux.just("foo", "bar", "baz") + + runBlocking { + assertThat(spec.allAsFlow().toList()).contains("foo", "bar", "baz") + } + + verify { + spec.all() + } + } }