From 614aceb1145423ef1612673812da523fdeb1db10 Mon Sep 17 00:00:00 2001 From: Joffrey Bion Date: Wed, 17 Nov 2021 18:17:58 +0100 Subject: Move from BroadcastChannel to SharedFlow in sagas --- .../sevenwonders/ui/redux/sagas/RouteBasedSagas.kt | 5 +- .../sevenwonders/ui/redux/sagas/SagasFramework.kt | 63 +++------------------- 2 files changed, 12 insertions(+), 56 deletions(-) (limited to 'sw-ui') diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt index a014a318..fee5a960 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/RouteBasedSagas.kt @@ -1,5 +1,6 @@ package org.luxons.sevenwonders.ui.redux.sagas +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.map import org.luxons.sevenwonders.client.SevenWondersSession import org.luxons.sevenwonders.ui.redux.* @@ -9,7 +10,9 @@ import org.luxons.sevenwonders.ui.router.Route suspend fun SwSagaContext.gameBrowserSaga(session: SevenWondersSession) { // browser navigation could have brought us here: we should leave the game/lobby ensureNoCurrentGameNorLobby(session) - session.watchGames().map { UpdateGameListAction(it) }.dispatchAll() + session.watchGames() + .map { UpdateGameListAction(it) } + .collect { dispatch(it) } } private suspend fun SwSagaContext.ensureNoCurrentGameNorLobby(session: SevenWondersSession) { diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt index 22377512..ce05ca15 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt @@ -1,21 +1,17 @@ package org.luxons.sevenwonders.ui.redux.sagas import kotlinx.coroutines.* -import kotlinx.coroutines.channels.BroadcastChannel -import kotlinx.coroutines.channels.Channel.Factory.BUFFERED -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.* import redux.Middleware import redux.MiddlewareApi import redux.RAction -@OptIn(ExperimentalCoroutinesApi::class) // for BroadcastChannel class SagaManager( private val monitor: ((A) -> Unit)? = null, ) { private lateinit var context: SagaContext - private val actions = BroadcastChannel(BUFFERED) + private val actions = MutableSharedFlow(extraBufferCapacity = 16) fun createMiddleware(): Middleware = ::sagasMiddleware @@ -37,7 +33,7 @@ class SagaManager( @OptIn(DelicateCoroutinesApi::class) // Ok because almost never suspends - if it does, we have bigger problems private fun handleAction(action: A) { - GlobalScope.launch { actions.send(action) } + GlobalScope.launch { actions.emit(action) } } fun launchSaga(coroutineScope: CoroutineScope, saga: suspend SagaContext.() -> Unit): Job { @@ -59,10 +55,9 @@ class SagaManager( } } -@OptIn(ExperimentalCoroutinesApi::class) // for BroadcastChannel class SagaContext( private val reduxApi: MiddlewareApi, - private val actions: BroadcastChannel, + val reduxActions: SharedFlow, ) { /** * The current redux state. @@ -77,44 +72,14 @@ class SagaContext( reduxApi.dispatch(action) } - /** - * Dispatches all actions from this flow. - */ - suspend fun Flow.dispatchAll() { - collect { - reduxApi.dispatch(it) - } - } - - /** - * Dispatches all actions from this flow in the provided [scope]. - */ - fun Flow.dispatchAllIn(scope: CoroutineScope): Job = scope.launch { dispatchAll() } - - /** - * Executes [handle] on every action dispatched. This runs forever until the current coroutine is cancelled. - */ - suspend fun onEach(handle: suspend SagaContext.(A) -> Unit) { - val channel = actions.openSubscription() - try { - for (a in channel) { - handle(a) - } - } finally { - channel.cancel() - } - } - /** * Executes [handle] on every action dispatched of the type [T]. This runs forever until the current coroutine is * cancelled. */ suspend inline fun onEach( crossinline handle: suspend SagaContext.(T) -> Unit, - ) = onEach { - if (it is T) { - handle(it) - } + ) { + reduxActions.filterIsInstance().collect { handle(it) } } /** @@ -128,22 +93,10 @@ class SagaContext( /** * Suspends until the next action matching the given [predicate] is dispatched, and returns that action. */ - suspend fun next(predicate: (A) -> Boolean): A { - val channel = actions.openSubscription() - try { - for (a in channel) { - if (predicate(a)) { - return a - } - } - } finally { - channel.cancel() - } - error("Actions channel closed before receiving a matching action") - } + suspend fun next(predicate: (A) -> Boolean): A = reduxActions.first { predicate(it) } /** * Suspends until the next action of type [T] is dispatched, and returns that action. */ - suspend inline fun next(): T = next { it is T } as T + suspend inline fun next(): T = reduxActions.filterIsInstance().first() } -- cgit