Skip to content

Commit c271438

Browse files
author
Lucas Sales
committed
Tweak collectFlow
1 parent 78552ef commit c271438

File tree

3 files changed

+246
-63
lines changed

3 files changed

+246
-63
lines changed

app/src/main/java/com/monstarlab/arch/extensions/FlowExtensions.kt

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,75 @@
11
package com.monstarlab.arch.extensions
22

3+
import androidx.lifecycle.Lifecycle
34
import kotlinx.coroutines.CoroutineScope
4-
import kotlinx.coroutines.flow.Flow
5-
import kotlinx.coroutines.flow.collect
6-
import kotlinx.coroutines.flow.combine
7-
import kotlinx.coroutines.flow.flow
5+
import kotlinx.coroutines.ExperimentalCoroutinesApi
6+
import kotlinx.coroutines.flow.*
87
import kotlinx.coroutines.launch
98

9+
/**
10+
* Flow operator that emits values from `this` upstream Flow when the [lifecycle] is
11+
* at least at [minActiveState] state. The emissions will be stopped when the lifecycle state
12+
* falls below [minActiveState] state.
13+
*
14+
* The flow will automatically start and cancel collecting from `this` upstream flow as the
15+
* [lifecycle] moves in and out of the target state.
16+
*
17+
* If [this] upstream Flow completes emitting items, `flowWithLifecycle` will trigger the flow
18+
* collection again when the [minActiveState] state is reached.
19+
*
20+
* This is NOT a terminal operator. This operator is usually followed by [collect], or
21+
* [onEach] and [launchIn] to process the emitted values.
22+
*
23+
* Note: this operator creates a hot flow that only closes when the [lifecycle] is destroyed or
24+
* the coroutine that collects from the flow is cancelled.
25+
*
26+
* ```
27+
* class MyActivity : AppCompatActivity() {
28+
* override fun onCreate(savedInstanceState: Bundle?) {
29+
* /* ... */
30+
* // Launches a coroutine that collects items from a flow when the Activity
31+
* // is at least started. It will automatically cancel when the activity is stopped and
32+
* // start collecting again whenever it's started again.
33+
* lifecycleScope.launch {
34+
* flow
35+
* .flowWithLifecycle(lifecycle, Lifecycle.State.STARTED)
36+
* .collect {
37+
* // Consume flow emissions
38+
* }
39+
* }
40+
* }
41+
* }
42+
* ```
43+
*
44+
* Warning: [Lifecycle.State.INITIALIZED] is not allowed in this API. Passing it as a
45+
* parameter will throw an [IllegalArgumentException].
46+
*
47+
* Tip: If multiple flows need to be collected using `flowWithLifecycle`, consider using
48+
* the [LifecycleOwner.addRepeatingJob] API to collect from all of them using a different
49+
* [launch] per flow instead. This will be more efficient as only one [LifecycleObserver] will be
50+
* added to the [lifecycle] instead of one per flow.
51+
*
52+
* @param lifecycle The [Lifecycle] where the restarting collecting from `this` flow work will be
53+
* kept alive.
54+
* @param minActiveState [Lifecycle.State] in which the upstream flow gets collected. The
55+
* collection will stop if the lifecycle falls below that state, and will restart if it's in that
56+
* state again.
57+
* @return [Flow] that only emits items from `this` upstream flow when the [lifecycle] is at
58+
* least in the [minActiveState].
59+
*/
60+
@OptIn(ExperimentalCoroutinesApi::class)
61+
public fun <T> Flow<T>.flowWithLifecycle(
62+
lifecycle: Lifecycle,
63+
minActiveState: Lifecycle.State = Lifecycle.State.STARTED
64+
): Flow<T> = callbackFlow {
65+
lifecycle.repeatOnLifecycle(minActiveState) {
66+
this@flowWithLifecycle.collect {
67+
send(it)
68+
}
69+
}
70+
close()
71+
}
72+
1073
fun <T1, T2> CoroutineScope.combineFlows(flow1: Flow<T1>, flow2: Flow<T2>, collectBlock: (suspend (T1, T2) -> Unit)) {
1174
launch {
1275
flow1.combine(flow2) { v1, v2 ->
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
package com.monstarlab.arch.extensions
2+
3+
/*
4+
* Copyright 2021 The Android Open Source Project
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
import androidx.lifecycle.Lifecycle
20+
import androidx.lifecycle.LifecycleEventObserver
21+
import androidx.lifecycle.LifecycleOwner
22+
import androidx.lifecycle.lifecycleScope
23+
import kotlinx.coroutines.CoroutineScope
24+
import kotlinx.coroutines.Dispatchers
25+
import kotlinx.coroutines.Job
26+
import kotlinx.coroutines.coroutineScope
27+
import kotlinx.coroutines.launch
28+
import kotlinx.coroutines.suspendCancellableCoroutine
29+
import kotlinx.coroutines.withContext
30+
import kotlin.coroutines.CoroutineContext
31+
import kotlin.coroutines.EmptyCoroutineContext
32+
import kotlin.coroutines.resume
33+
/**
34+
* Launches and runs the given [block] in a coroutine when `this` [LifecycleOwner]'s [Lifecycle]
35+
* is at least at [state]. The launched coroutine will be cancelled when the lifecycle state falls
36+
* below [state].
37+
*
38+
* The [block] will cancel and re-launch as the lifecycle moves in and out of the target state.
39+
* To permanently remove the work from the lifecycle, [Job.cancel] the returned [Job].
40+
*
41+
* ```
42+
* // Runs the block of code in a coroutine when the lifecycleOwner is at least STARTED.
43+
* // The coroutine will be cancelled when the ON_STOP event happens and will restart executing
44+
* // if the lifecycleOwner's lifecycle receives the ON_START event again.
45+
* lifecycleOwner.addRepeatingJob(Lifecycle.State.STARTED) {
46+
* uiStateFlow.collect { uiState ->
47+
* updateUi(uiState)
48+
* }
49+
* }
50+
* ```
51+
*
52+
* The best practice is to call this function when the lifecycleOwner is initialized. For
53+
* example, `onCreate` in an Activity, or `onViewCreated` in a Fragment. Otherwise, multiple
54+
* repeating jobs doing the same could be registered and be executed at the same time.
55+
*
56+
* Warning: [Lifecycle.State.INITIALIZED] is not allowed in this API. Passing it as a
57+
* parameter will throw an [IllegalArgumentException].
58+
*
59+
* @see Lifecycle.repeatOnLifecycle for details
60+
*
61+
* @param state [Lifecycle.State] in which the coroutine running [block] starts. That coroutine
62+
* will cancel if the lifecycle falls below that state, and will restart if it's in that state
63+
* again.
64+
* @param coroutineContext [CoroutineContext] used to execute [block].
65+
* @param block The block to run when the lifecycle is at least in [state] state.
66+
* @return [Job] to manage the repeating work.
67+
*/
68+
public fun LifecycleOwner.addRepeatingJob(
69+
state: Lifecycle.State,
70+
coroutineContext: CoroutineContext = EmptyCoroutineContext,
71+
block: suspend CoroutineScope.() -> Unit
72+
): Job = lifecycleScope.launch(coroutineContext) {
73+
lifecycle.repeatOnLifecycle(state, block)
74+
}
75+
/**
76+
* Runs the given [block] in a new coroutine when `this` [Lifecycle] is at least at [state] and
77+
* suspends the execution until `this` [Lifecycle] is [Lifecycle.State.DESTROYED].
78+
*
79+
* The [block] will cancel and re-launch as the lifecycle moves in and out of the target state.
80+
*
81+
* Warning: [Lifecycle.State.INITIALIZED] is not allowed in this API. Passing it as a
82+
* parameter will throw an [IllegalArgumentException].
83+
*
84+
* @param state [Lifecycle.State] in which `block` runs in a new coroutine. That coroutine
85+
* will cancel if the lifecycle falls below that state, and will restart if it's in that state
86+
* again.
87+
* @param block The block to run when the lifecycle is at least in [state] state.
88+
*/
89+
public suspend fun Lifecycle.repeatOnLifecycle(
90+
state: Lifecycle.State,
91+
block: suspend CoroutineScope.() -> Unit
92+
) {
93+
require(state !== Lifecycle.State.INITIALIZED) {
94+
"repeatOnLifecycle cannot start work with the INITIALIZED lifecycle state."
95+
}
96+
if (currentState === Lifecycle.State.DESTROYED) {
97+
return
98+
}
99+
coroutineScope {
100+
withContext(Dispatchers.Main.immediate) {
101+
// Check the current state of the lifecycle as the previous check is not guaranteed
102+
// to be done on the main thread.
103+
if (currentState === Lifecycle.State.DESTROYED) return@withContext
104+
// Instance of the running repeating coroutine
105+
var launchedJob: Job? = null
106+
// Registered observer
107+
var observer: LifecycleEventObserver? = null
108+
try {
109+
// Suspend the coroutine until the lifecycle is destroyed or
110+
// the coroutine is cancelled
111+
suspendCancellableCoroutine<Unit> { cont ->
112+
// Lifecycle observers that executes `block` when the lifecycle reaches certain state, and
113+
// cancels when it moves falls below that state.
114+
val startWorkEvent = Lifecycle.Event.upTo(state)
115+
val cancelWorkEvent = Lifecycle.Event.downFrom(state)
116+
observer = LifecycleEventObserver { _, event ->
117+
if (event == startWorkEvent) {
118+
// Launch the repeating work preserving the calling context
119+
launchedJob = this@coroutineScope.launch(block = block)
120+
return@LifecycleEventObserver
121+
}
122+
if (event == cancelWorkEvent) {
123+
launchedJob?.cancel()
124+
launchedJob = null
125+
}
126+
if (event == Lifecycle.Event.ON_DESTROY) {
127+
cont.resume(Unit)
128+
}
129+
}
130+
this@repeatOnLifecycle.addObserver(observer as LifecycleEventObserver)
131+
}
132+
} finally {
133+
launchedJob?.cancel()
134+
observer?.let {
135+
this@repeatOnLifecycle.removeObserver(it)
136+
}
137+
}
138+
}
139+
}
140+
}

app/src/main/java/com/monstarlab/arch/extensions/ViewExtensions.kt

Lines changed: 39 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@ package com.monstarlab.arch.extensions
33
import android.view.View
44
import androidx.core.view.isVisible
55
import androidx.fragment.app.Fragment
6-
import androidx.lifecycle.DefaultLifecycleObserver
7-
import androidx.lifecycle.LifecycleOwner
86
import androidx.lifecycle.lifecycleScope
97
import com.google.android.material.snackbar.Snackbar
108
import com.monstarlab.core.sharedui.errorhandling.ViewError
119
import kotlinx.coroutines.channels.awaitClose
1210
import kotlinx.coroutines.flow.*
1311

14-
fun Fragment.snackErrorFlow(targetFlow: SharedFlow<ViewError>, root: View, length: Int = Snackbar.LENGTH_SHORT) {
12+
fun Fragment.snackErrorFlow(
13+
targetFlow: SharedFlow<ViewError>,
14+
root: View,
15+
length: Int = Snackbar.LENGTH_SHORT
16+
) {
1517
collectFlow(targetFlow) { viewError ->
1618
Snackbar.make(root, viewError.message, length).show()
1719
}
@@ -24,74 +26,52 @@ fun Fragment.visibilityFlow(targetFlow: Flow<Boolean>, vararg view: View) {
2426
}
2527

2628
fun <T> Fragment.collectFlow(targetFlow: Flow<T>, collectBlock: ((T) -> Unit)) {
27-
safeViewCollect {
28-
viewLifecycleOwner.lifecycleScope.launchWhenCreated {
29-
targetFlow.collect {
30-
collectBlock.invoke(it)
29+
lifecycleScope.launchWhenStarted {
30+
targetFlow.flowWithLifecycle(viewLifecycleOwner.lifecycle)
31+
.collect {
32+
collectBlock(it)
3133
}
32-
}
3334
}
3435
}
3536

36-
private inline fun Fragment.safeViewCollect(crossinline viewOwner: LifecycleOwner.() -> Unit) {
37-
lifecycle.addObserver(object : DefaultLifecycleObserver {
38-
override fun onCreate(owner: LifecycleOwner) {
39-
viewLifecycleOwnerLiveData.observe(
40-
this@safeViewCollect,
41-
{ viewLifecycleOwner ->
42-
viewLifecycleOwner.viewOwner()
43-
}
44-
)
45-
}
46-
})
47-
}
4837

49-
fun <T1, T2> Fragment.combineFlows(flow1: Flow<T1>, flow2: Flow<T2>, collectBlock: ((T1, T2) -> Unit)) {
50-
safeViewCollect {
51-
viewLifecycleOwner.lifecycleScope.launchWhenCreated {
52-
flow1.combine(flow2) { v1, v2 ->
53-
collectBlock.invoke(v1, v2)
54-
}.collect {
55-
// Empty collect block to trigger ^
56-
}
57-
}
58-
}
38+
fun <T1, T2> Fragment.combineFlows(
39+
flow1: Flow<T1>,
40+
flow2: Flow<T2>,
41+
collectBlock: ((T1, T2) -> Unit)
42+
) {
43+
collectFlow(flow1.combine(flow2) { v1, v2 ->
44+
collectBlock.invoke(v1, v2)
45+
}) {}
5946
}
6047

61-
fun <T1, T2, T3> Fragment.combineFlows(flow1: Flow<T1>, flow2: Flow<T2>, flow3: Flow<T3>, collectBlock: ((T1, T2, T3) -> Unit)) {
62-
safeViewCollect {
63-
viewLifecycleOwner.lifecycleScope.launchWhenCreated {
64-
combine(flow1, flow2, flow3) { v1, v2, v3 ->
65-
collectBlock.invoke(v1, v2, v3)
66-
}.collect {
67-
// Empty collect block to trigger ^
68-
}
69-
}
70-
}
48+
fun <T1, T2, T3> Fragment.combineFlows(
49+
flow1: Flow<T1>,
50+
flow2: Flow<T2>,
51+
flow3: Flow<T3>,
52+
collectBlock: ((T1, T2, T3) -> Unit)
53+
) {
54+
collectFlow(combine(flow1, flow2, flow3) { v1, v2, v3 ->
55+
collectBlock.invoke(v1, v2, v3)
56+
}) {}
7157
}
7258

73-
fun <T1, T2, T3, T4> Fragment.combineFlows(flow1: Flow<T1>, flow2: Flow<T2>, flow3: Flow<T3>, flow4: Flow<T4>, collectBlock: ((T1, T2, T3, T4) -> Unit)) {
74-
safeViewCollect {
75-
viewLifecycleOwner.lifecycleScope.launchWhenCreated {
76-
combine(flow1, flow2, flow3, flow4) { v1, v2, v3, v4 ->
77-
collectBlock.invoke(v1, v2, v3, v4)
78-
}.collect {
79-
// Empty collect block to trigger ^
80-
}
81-
}
82-
}
59+
fun <T1, T2, T3, T4> Fragment.combineFlows(
60+
flow1: Flow<T1>,
61+
flow2: Flow<T2>,
62+
flow3: Flow<T3>,
63+
flow4: Flow<T4>,
64+
collectBlock: ((T1, T2, T3, T4) -> Unit)
65+
) {
66+
collectFlow(combine(flow1, flow2, flow3, flow4) { v1, v2, v3, v4 ->
67+
collectBlock.invoke(v1, v2, v3, v4)
68+
}) {}
8369
}
8470

8571
fun <T1, T2> Fragment.zipFlows(flow1: Flow<T1>, flow2: Flow<T2>, collectBlock: ((T1, T2) -> Unit)) {
86-
safeViewCollect {
87-
viewLifecycleOwner.lifecycleScope.launchWhenCreated {
88-
flow1.zip(flow2) { v1, v2 ->
89-
collectBlock.invoke(v1, v2)
90-
}.collect {
91-
// Empty collect block to trigger ^
92-
}
93-
}
94-
}
72+
collectFlow(flow1.zip(flow2) { v1, v2 ->
73+
collectBlock.invoke(v1, v2)
74+
}) {}
9575
}
9676

9777
fun View.clicks(throttleTime: Long = 400): Flow<Unit> = callbackFlow {

0 commit comments

Comments
 (0)