diff options
-rw-r--r-- | sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt | 5 | ||||
-rw-r--r-- | sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt | 63 |
2 files changed, 12 insertions, 56 deletions
diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt index a014a318..fee5a960 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt @@ -1,5 +1,6 @@ package org.luxons.sevenwonders.ui.redux.sagas +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.map import org.luxons.sevenwonders.client.SevenWondersSession import org.luxons.sevenwonders.ui.redux.* @@ -9,7 +10,9 @@ import org.luxons.sevenwonders.ui.router.Route suspend fun SwSagaContext.gameBrowserSaga(session: SevenWondersSession) { // browser navigation could have brought us here: we should leave the game/lobby ensureNoCurrentGameNorLobby(session) - session.watchGames().map { UpdateGameListAction(it) }.dispatchAll() + session.watchGames() + .map { UpdateGameListAction(it) } + .collect { dispatch(it) } } private suspend fun SwSagaContext.ensureNoCurrentGameNorLobby(session: SevenWondersSession) { diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt index 22377512..ce05ca15 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt @@ -1,21 +1,17 @@ package org.luxons.sevenwonders.ui.redux.sagas import kotlinx.coroutines.* -import kotlinx.coroutines.channels.BroadcastChannel -import kotlinx.coroutines.channels.Channel.Factory.BUFFERED -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.* import redux.Middleware import redux.MiddlewareApi import redux.RAction -@OptIn(ExperimentalCoroutinesApi::class) // for BroadcastChannel class SagaManager<S, A : RAction, R>( private val monitor: ((A) -> Unit)? = null, ) { private lateinit var context: SagaContext<S, A, R> - private val actions = BroadcastChannel<A>(BUFFERED) + private val actions = MutableSharedFlow<A>(extraBufferCapacity = 16) fun createMiddleware(): Middleware<S, A, R, A, R> = ::sagasMiddleware @@ -37,7 +33,7 @@ class SagaManager<S, A : RAction, R>( @OptIn(DelicateCoroutinesApi::class) // Ok because almost never suspends - if it does, we have bigger problems private fun handleAction(action: A) { - GlobalScope.launch { actions.send(action) } + GlobalScope.launch { actions.emit(action) } } fun launchSaga(coroutineScope: CoroutineScope, saga: suspend SagaContext<S, A, R>.() -> Unit): Job { @@ -59,10 +55,9 @@ class SagaManager<S, A : RAction, R>( } } -@OptIn(ExperimentalCoroutinesApi::class) // for BroadcastChannel class SagaContext<S, A : RAction, R>( private val reduxApi: MiddlewareApi<S, A, R>, - private val actions: BroadcastChannel<A>, + val reduxActions: SharedFlow<A>, ) { /** * The current redux state. @@ -78,43 +73,13 @@ class SagaContext<S, A : RAction, R>( } /** - * Dispatches all actions from this flow. - */ - suspend fun Flow<A>.dispatchAll() { - collect { - reduxApi.dispatch(it) - } - } - - /** - * Dispatches all actions from this flow in the provided [scope]. - */ - fun Flow<A>.dispatchAllIn(scope: CoroutineScope): Job = scope.launch { dispatchAll() } - - /** - * Executes [handle] on every action dispatched. This runs forever until the current coroutine is cancelled. - */ - suspend fun onEach(handle: suspend SagaContext<S, A, R>.(A) -> Unit) { - val channel = actions.openSubscription() - try { - for (a in channel) { - handle(a) - } - } finally { - channel.cancel() - } - } - - /** * Executes [handle] on every action dispatched of the type [T]. This runs forever until the current coroutine is * cancelled. */ suspend inline fun <reified T : A> onEach( crossinline handle: suspend SagaContext<S, A, R>.(T) -> Unit, - ) = onEach { - if (it is T) { - handle(it) - } + ) { + reduxActions.filterIsInstance<T>().collect { handle(it) } } /** @@ -128,22 +93,10 @@ class SagaContext<S, A : RAction, R>( /** * Suspends until the next action matching the given [predicate] is dispatched, and returns that action. */ - suspend fun next(predicate: (A) -> Boolean): A { - val channel = actions.openSubscription() - try { - for (a in channel) { - if (predicate(a)) { - return a - } - } - } finally { - channel.cancel() - } - error("Actions channel closed before receiving a matching action") - } + suspend fun next(predicate: (A) -> Boolean): A = reduxActions.first { predicate(it) } /** * Suspends until the next action of type [T] is dispatched, and returns that action. */ - suspend inline fun <reified T : A> next(): T = next { it is T } as T + suspend inline fun <reified T : A> next(): T = reduxActions.filterIsInstance<T>().first() } |