Skip to content

Commit 96f8d6b

Browse files
authored
SharedFlow (#6)
migrate BroadcastChannel to SharedFlow
1 parent a27dfa4 commit 96f8d6b

File tree

4 files changed

+43
-50
lines changed

4 files changed

+43
-50
lines changed

buildSrc/src/main/kotlin/deps.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ object deps {
4545
}
4646

4747
object jetbrains {
48-
private const val version = "1.3.9"
48+
private const val version = "1.4.0-M1"
4949

5050
const val coroutinesCore = "org.jetbrains.kotlinx:kotlinx-coroutines-core:$version"
5151
const val coroutinesAndroid = "org.jetbrains.kotlinx:kotlinx-coroutines-android:$version"

data/src/main/java/com/hoc/flowmvi/data/UserRepositoryImpl.kt

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,9 @@ import com.hoc.flowmvi.domain.entity.User
1010
import com.hoc.flowmvi.domain.repository.UserRepository
1111
import kotlinx.coroutines.ExperimentalCoroutinesApi
1212
import kotlinx.coroutines.FlowPreview
13-
import kotlinx.coroutines.channels.BroadcastChannel
14-
import kotlinx.coroutines.channels.Channel
1513
import kotlinx.coroutines.delay
1614
import kotlinx.coroutines.flow.Flow
17-
import kotlinx.coroutines.flow.asFlow
15+
import kotlinx.coroutines.flow.MutableSharedFlow
1816
import kotlinx.coroutines.flow.emitAll
1917
import kotlinx.coroutines.flow.flow
2018
import kotlinx.coroutines.flow.onEach
@@ -36,7 +34,7 @@ internal class UserRepositoryImpl constructor(
3634
class Added(val user: User) : Change()
3735
}
3836

39-
private val changesChannel = BroadcastChannel<Change>(Channel.CONFLATED)
37+
private val changesFlow = MutableSharedFlow<Change>()
4038

4139
private suspend fun getUsersFromRemote(): List<User> {
4240
return withContext(dispatchers.io) {
@@ -49,8 +47,7 @@ internal class UserRepositoryImpl constructor(
4947
return flow {
5048
val initial = getUsersFromRemote()
5149

52-
changesChannel
53-
.asFlow()
50+
changesFlow
5451
.onEach { Log.d("###", "[USER_REPO] Change=$it") }
5552
.scan(initial) { acc, change ->
5653
when (change) {
@@ -65,20 +62,20 @@ internal class UserRepositoryImpl constructor(
6562
}
6663

6764
override suspend fun refresh() =
68-
getUsersFromRemote().let { changesChannel.send(Change.Refreshed(it)) }
65+
getUsersFromRemote().let { changesFlow.emit(Change.Refreshed(it)) }
6966

7067
override suspend fun remove(user: User) {
7168
withContext(dispatchers.io) {
7269
val response = userApiService.remove(domainToResponse(user).id)
73-
changesChannel.send(Change.Removed(responseToDomain(response)))
70+
changesFlow.emit(Change.Removed(responseToDomain(response)))
7471
}
7572
}
7673

7774
override suspend fun add(user: User) {
7875
withContext(dispatchers.io) {
7976
val body = domainToBody(user).copy(avatar = avatarUrls.random())
8077
val response = userApiService.add(body)
81-
changesChannel.send(Change.Added(responseToDomain(response)))
78+
changesFlow.emit(Change.Added(responseToDomain(response)))
8279
delay(400)
8380
}
8481
}

feature-add/src/main/java/com/hoc/flowmvi/ui/add/AddVM.kt

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.hoc.flowmvi.ui.add
22

3+
import android.util.Log
34
import androidx.core.util.PatternsCompat
45
import androidx.lifecycle.ViewModel
56
import androidx.lifecycle.viewModelScope
@@ -9,50 +10,42 @@ import com.hoc.flowmvi.domain.entity.User
910
import com.hoc.flowmvi.domain.usecase.AddUserUseCase
1011
import kotlinx.coroutines.ExperimentalCoroutinesApi
1112
import kotlinx.coroutines.FlowPreview
12-
import kotlinx.coroutines.channels.BroadcastChannel
13-
import kotlinx.coroutines.channels.Channel
1413
import kotlinx.coroutines.flow.Flow
15-
import kotlinx.coroutines.flow.MutableStateFlow
14+
import kotlinx.coroutines.flow.MutableSharedFlow
15+
import kotlinx.coroutines.flow.SharingStarted
1616
import kotlinx.coroutines.flow.StateFlow
17-
import kotlinx.coroutines.flow.asFlow
1817
import kotlinx.coroutines.flow.catch
1918
import kotlinx.coroutines.flow.combine
2019
import kotlinx.coroutines.flow.filter
2120
import kotlinx.coroutines.flow.filterIsInstance
2221
import kotlinx.coroutines.flow.flow
23-
import kotlinx.coroutines.flow.launchIn
2422
import kotlinx.coroutines.flow.map
2523
import kotlinx.coroutines.flow.merge
2624
import kotlinx.coroutines.flow.onEach
2725
import kotlinx.coroutines.flow.onStart
2826
import kotlinx.coroutines.flow.scan
27+
import kotlinx.coroutines.flow.shareIn
28+
import kotlinx.coroutines.flow.stateIn
2929

3030
@FlowPreview
3131
@ExperimentalCoroutinesApi
3232
internal class AddVM(private val addUser: AddUserUseCase) : ViewModel() {
33-
private val _eventChannel = BroadcastChannel<SingleEvent>(capacity = Channel.BUFFERED)
34-
private val _intentChannel = BroadcastChannel<ViewIntent>(capacity = Channel.BUFFERED)
33+
private val _eventFlow = MutableSharedFlow<SingleEvent>()
34+
private val _intentFlow = MutableSharedFlow<ViewIntent>()
3535

3636
val viewState: StateFlow<ViewState>
37+
val singleEvent: Flow<SingleEvent> get() = _eventFlow
3738

38-
val singleEvent: Flow<SingleEvent>
39-
40-
suspend fun processIntent(intent: ViewIntent) = _intentChannel.send(intent)
39+
suspend fun processIntent(intent: ViewIntent) = _intentFlow.emit(intent)
4140

4241
init {
4342
val initialVS = ViewState.initial()
44-
45-
viewState = MutableStateFlow(initialVS)
46-
singleEvent = _eventChannel.asFlow()
47-
48-
_intentChannel
49-
.asFlow()
43+
viewState = _intentFlow
5044
.toPartialStateChangesFlow()
5145
.sendSingleEvent()
5246
.scan(initialVS) { state, change -> change.reduce(state) }
53-
.onEach { viewState.value = it }
54-
.catch { }
55-
.launchIn(viewModelScope)
47+
.catch { Log.d("###", "[ADD_VM] Throwable: $it") }
48+
.stateIn(viewModelScope, SharingStarted.Eagerly, initialVS)
5649
}
5750

5851
private fun Flow<PartialStateChange>.sendSingleEvent(): Flow<PartialStateChange> {
@@ -69,7 +62,7 @@ internal class AddVM(private val addUser: AddUserUseCase) : ViewModel() {
6962
PartialStateChange.FirstChange.FirstNameChangedFirstTime -> return@onEach
7063
PartialStateChange.FirstChange.LastNameChangedFirstTime -> return@onEach
7164
}
72-
_eventChannel.send(event)
65+
_eventFlow.emit(event)
7366
}
7467
}
7568

@@ -99,6 +92,10 @@ internal class AddVM(private val addUser: AddUserUseCase) : ViewModel() {
9992
)
10093
)
10194
}
95+
.shareIn(
96+
scope = viewModelScope,
97+
started = SharingStarted.WhileSubscribed()
98+
)
10299

103100
val addUserChanges = filterIsInstance<ViewIntent.Submit>()
104101
.withLatestFrom(userFormFlow) { _, userForm -> userForm }

feature-main/src/main/java/com/hoc/flowmvi/ui/main/MainVM.kt

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@ import com.hoc.flowmvi.domain.usecase.RefreshGetUsersUseCase
99
import com.hoc.flowmvi.domain.usecase.RemoveUserUseCase
1010
import kotlinx.coroutines.ExperimentalCoroutinesApi
1111
import kotlinx.coroutines.FlowPreview
12-
import kotlinx.coroutines.channels.BroadcastChannel
13-
import kotlinx.coroutines.channels.Channel
1412
import kotlinx.coroutines.flow.Flow
15-
import kotlinx.coroutines.flow.MutableStateFlow
13+
import kotlinx.coroutines.flow.MutableSharedFlow
14+
import kotlinx.coroutines.flow.SharingStarted
1615
import kotlinx.coroutines.flow.StateFlow
1716
import kotlinx.coroutines.flow.asFlow
1817
import kotlinx.coroutines.flow.catch
@@ -22,12 +21,13 @@ import kotlinx.coroutines.flow.filterNot
2221
import kotlinx.coroutines.flow.flatMapConcat
2322
import kotlinx.coroutines.flow.flatMapMerge
2423
import kotlinx.coroutines.flow.flow
25-
import kotlinx.coroutines.flow.launchIn
2624
import kotlinx.coroutines.flow.map
2725
import kotlinx.coroutines.flow.merge
2826
import kotlinx.coroutines.flow.onEach
2927
import kotlinx.coroutines.flow.onStart
3028
import kotlinx.coroutines.flow.scan
29+
import kotlinx.coroutines.flow.shareIn
30+
import kotlinx.coroutines.flow.stateIn
3131
import kotlinx.coroutines.flow.take
3232

3333
@Suppress("USELESS_CAST")
@@ -38,32 +38,31 @@ internal class MainVM(
3838
private val refreshGetUsers: RefreshGetUsersUseCase,
3939
private val removeUser: RemoveUserUseCase,
4040
) : ViewModel() {
41-
private val _eventChannel = BroadcastChannel<SingleEvent>(capacity = Channel.BUFFERED)
42-
private val _intentChannel = BroadcastChannel<ViewIntent>(capacity = Channel.BUFFERED)
41+
private val _eventFlow = MutableSharedFlow<SingleEvent>()
42+
private val _intentFlow = MutableSharedFlow<ViewIntent>()
4343

4444
val viewState: StateFlow<ViewState>
45+
val singleEvent: Flow<SingleEvent> get() = _eventFlow
4546

46-
val singleEvent: Flow<SingleEvent>
47-
48-
suspend fun processIntent(intent: ViewIntent) = _intentChannel.send(intent)
47+
suspend fun processIntent(intent: ViewIntent) = _intentFlow.emit(intent)
4948

5049
init {
5150
val initialVS = ViewState.initial()
5251

53-
viewState = MutableStateFlow(initialVS)
54-
singleEvent = _eventChannel.asFlow()
55-
56-
val intentFlow = _intentChannel.asFlow()
57-
merge(
58-
intentFlow.filterIsInstance<ViewIntent.Initial>().take(1),
59-
intentFlow.filterNot { it is ViewIntent.Initial }
52+
viewState = merge(
53+
_intentFlow.filterIsInstance<ViewIntent.Initial>().take(1),
54+
_intentFlow.filterNot { it is ViewIntent.Initial }
6055
)
56+
.shareIn(viewModelScope, SharingStarted.WhileSubscribed())
6157
.toPartialChangeFlow()
6258
.sendSingleEvent()
6359
.scan(initialVS) { vs, change -> change.reduce(vs) }
64-
.onEach { viewState.value = it }
65-
.catch { }
66-
.launchIn(viewModelScope)
60+
.catch { Log.d("###", "[MAIN_VM] Throwable: $it") }
61+
.stateIn(
62+
viewModelScope,
63+
SharingStarted.Eagerly,
64+
initialVS
65+
)
6766
}
6867

6968
private fun Flow<PartialChange>.sendSingleEvent(): Flow<PartialChange> {
@@ -81,7 +80,7 @@ internal class MainVM(
8180
is PartialChange.GetUser.Data -> return@onEach
8281
PartialChange.Refresh.Loading -> return@onEach
8382
}
84-
_eventChannel.send(event)
83+
_eventFlow.emit(event)
8584
}
8685
}
8786

0 commit comments

Comments
 (0)