@@ -18,19 +18,17 @@ import com.hoc.flowmvi.domain.model.UserValidationError
18
18
import com.hoc.flowmvi.domain.repository.UserRepository
19
19
import com.hoc081098.flowext.flowFromSuspend
20
20
import com.hoc081098.flowext.retryWithExponentialBackoff
21
+ import com.hoc081098.flowext.scanWith
21
22
import java.io.IOException
22
23
import kotlin.time.Duration.Companion.milliseconds
23
24
import kotlin.time.ExperimentalTime
24
25
import kotlinx.coroutines.ExperimentalCoroutinesApi
25
26
import kotlinx.coroutines.FlowPreview
26
- import kotlinx.coroutines.flow.Flow
27
27
import kotlinx.coroutines.flow.MutableSharedFlow
28
28
import kotlinx.coroutines.flow.catch
29
29
import kotlinx.coroutines.flow.first
30
- import kotlinx.coroutines.flow.flatMapConcat
31
30
import kotlinx.coroutines.flow.map
32
31
import kotlinx.coroutines.flow.onEach
33
- import kotlinx.coroutines.flow.scan
34
32
import kotlinx.coroutines.withContext
35
33
import timber.log.Timber
36
34
@@ -44,25 +42,25 @@ internal class UserRepositoryImpl(
44
42
private val domainToBody : Mapper <User , UserBody >,
45
43
private val errorMapper : Mapper <Throwable , UserError >,
46
44
) : UserRepository {
47
- private sealed class Change {
45
+ private sealed interface Change {
48
46
class Removed (
49
47
val removed : User ,
50
- ) : Change()
48
+ ) : Change
51
49
52
50
class Refreshed (
53
51
val user : List <User >,
54
- ) : Change()
52
+ ) : Change
55
53
56
54
class Added (
57
55
val user : User ,
58
- ) : Change()
56
+ ) : Change
59
57
}
60
58
61
59
private val changesFlow = MutableSharedFlow <Change >(extraBufferCapacity = 64 )
62
60
63
61
private suspend inline fun sendChange (change : Change ) = changesFlow.emit(change)
64
62
65
- private fun getUsersFromRemote (): Flow < List <User > > =
63
+ private suspend fun getUsersFromRemoteWithRetry (): List <User > =
66
64
flowFromSuspend {
67
65
Timber .d(" [USER_REPO] getUsersFromRemote ..." )
68
66
@@ -79,19 +77,17 @@ internal class UserRepositoryImpl(
79
77
initialDelay = 500 .milliseconds,
80
78
factor = 2.0 ,
81
79
) { it is IOException }
80
+ .first()
82
81
83
82
override fun getUsers () =
84
- getUsersFromRemote()
85
- .flatMapConcat { initial ->
86
- changesFlow
87
- .onEach { Timber .d(" [USER_REPO] Change=$it " ) }
88
- .scan(initial) { acc, change ->
89
- when (change) {
90
- is Change .Removed -> acc.filter { it.id != change.removed.id }
91
- is Change .Refreshed -> change.user
92
- is Change .Added -> acc + change.user
93
- }
94
- }
83
+ changesFlow
84
+ .onEach { Timber .d(" [USER_REPO] Change=$it " ) }
85
+ .scanWith(::getUsersFromRemoteWithRetry) { acc, change ->
86
+ when (change) {
87
+ is Change .Removed -> acc.filter { it.id != change.removed.id }
88
+ is Change .Refreshed -> change.user
89
+ is Change .Added -> acc + change.user
90
+ }
95
91
}.onEach { Timber .d(" [USER_REPO] Emit users.size=${it.size} " ) }
96
92
.map { it.right().leftWiden<UserError , _ , _ >() }
97
93
.catch {
@@ -100,9 +96,9 @@ internal class UserRepositoryImpl(
100
96
}
101
97
102
98
override suspend fun refresh () =
103
- catchEither { getUsersFromRemote().first () }
99
+ catchEither { getUsersFromRemoteWithRetry () }
104
100
.onRight { sendChange(Change .Refreshed (it)) }
105
- .map { }
101
+ .map {}
106
102
.onLeft { logError(it, " refresh" ) }
107
103
.mapLeft(errorMapper)
108
104
0 commit comments