summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoffrey Bion <joffrey.bion@gmail.com>2021-11-17 18:17:58 +0100
committerJoffrey Bion <joffrey.bion@gmail.com>2021-11-18 01:40:53 +0100
commit614aceb1145423ef1612673812da523fdeb1db10 (patch)
treea248c3a2c709999ab05db6682ca6bcabafa772f8
parentAdd allWarningsAsErrors for stricter build (diff)
downloadseven-wonders-614aceb1145423ef1612673812da523fdeb1db10.tar.gz
seven-wonders-614aceb1145423ef1612673812da523fdeb1db10.tar.bz2
seven-wonders-614aceb1145423ef1612673812da523fdeb1db10.zip
Move from BroadcastChannel to SharedFlow in sagas
-rw-r--r--sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt5
-rw-r--r--sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt63
2 files changed, 12 insertions, 56 deletions
diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt
index a014a318..fee5a960 100644
--- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt
+++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt
@@ -1,5 +1,6 @@
package org.luxons.sevenwonders.ui.redux.sagas
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import org.luxons.sevenwonders.client.SevenWondersSession
import org.luxons.sevenwonders.ui.redux.*
@@ -9,7 +10,9 @@ import org.luxons.sevenwonders.ui.router.Route
suspend fun SwSagaContext.gameBrowserSaga(session: SevenWondersSession) {
// browser navigation could have brought us here: we should leave the game/lobby
ensureNoCurrentGameNorLobby(session)
- session.watchGames().map { UpdateGameListAction(it) }.dispatchAll()
+ session.watchGames()
+ .map { UpdateGameListAction(it) }
+ .collect { dispatch(it) }
}
private suspend fun SwSagaContext.ensureNoCurrentGameNorLobby(session: SevenWondersSession) {
diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt
index 22377512..ce05ca15 100644
--- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt
+++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt
@@ -1,21 +1,17 @@
package org.luxons.sevenwonders.ui.redux.sagas
import kotlinx.coroutines.*
-import kotlinx.coroutines.channels.BroadcastChannel
-import kotlinx.coroutines.channels.Channel.Factory.BUFFERED
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.*
import redux.Middleware
import redux.MiddlewareApi
import redux.RAction
-@OptIn(ExperimentalCoroutinesApi::class) // for BroadcastChannel
class SagaManager<S, A : RAction, R>(
private val monitor: ((A) -> Unit)? = null,
) {
private lateinit var context: SagaContext<S, A, R>
- private val actions = BroadcastChannel<A>(BUFFERED)
+ private val actions = MutableSharedFlow<A>(extraBufferCapacity = 16)
fun createMiddleware(): Middleware<S, A, R, A, R> = ::sagasMiddleware
@@ -37,7 +33,7 @@ class SagaManager<S, A : RAction, R>(
@OptIn(DelicateCoroutinesApi::class) // Ok because almost never suspends - if it does, we have bigger problems
private fun handleAction(action: A) {
- GlobalScope.launch { actions.send(action) }
+ GlobalScope.launch { actions.emit(action) }
}
fun launchSaga(coroutineScope: CoroutineScope, saga: suspend SagaContext<S, A, R>.() -> Unit): Job {
@@ -59,10 +55,9 @@ class SagaManager<S, A : RAction, R>(
}
}
-@OptIn(ExperimentalCoroutinesApi::class) // for BroadcastChannel
class SagaContext<S, A : RAction, R>(
private val reduxApi: MiddlewareApi<S, A, R>,
- private val actions: BroadcastChannel<A>,
+ val reduxActions: SharedFlow<A>,
) {
/**
* The current redux state.
@@ -78,43 +73,13 @@ class SagaContext<S, A : RAction, R>(
}
/**
- * Dispatches all actions from this flow.
- */
- suspend fun Flow<A>.dispatchAll() {
- collect {
- reduxApi.dispatch(it)
- }
- }
-
- /**
- * Dispatches all actions from this flow in the provided [scope].
- */
- fun Flow<A>.dispatchAllIn(scope: CoroutineScope): Job = scope.launch { dispatchAll() }
-
- /**
- * Executes [handle] on every action dispatched. This runs forever until the current coroutine is cancelled.
- */
- suspend fun onEach(handle: suspend SagaContext<S, A, R>.(A) -> Unit) {
- val channel = actions.openSubscription()
- try {
- for (a in channel) {
- handle(a)
- }
- } finally {
- channel.cancel()
- }
- }
-
- /**
* Executes [handle] on every action dispatched of the type [T]. This runs forever until the current coroutine is
* cancelled.
*/
suspend inline fun <reified T : A> onEach(
crossinline handle: suspend SagaContext<S, A, R>.(T) -> Unit,
- ) = onEach {
- if (it is T) {
- handle(it)
- }
+ ) {
+ reduxActions.filterIsInstance<T>().collect { handle(it) }
}
/**
@@ -128,22 +93,10 @@ class SagaContext<S, A : RAction, R>(
/**
* Suspends until the next action matching the given [predicate] is dispatched, and returns that action.
*/
- suspend fun next(predicate: (A) -> Boolean): A {
- val channel = actions.openSubscription()
- try {
- for (a in channel) {
- if (predicate(a)) {
- return a
- }
- }
- } finally {
- channel.cancel()
- }
- error("Actions channel closed before receiving a matching action")
- }
+ suspend fun next(predicate: (A) -> Boolean): A = reduxActions.first { predicate(it) }
/**
* Suspends until the next action of type [T] is dispatched, and returns that action.
*/
- suspend inline fun <reified T : A> next(): T = next { it is T } as T
+ suspend inline fun <reified T : A> next(): T = reduxActions.filterIsInstance<T>().first()
}
bgstack15