From 9aa36cd199c1d6cf0feb959e3ec6ac0539168cd5 Mon Sep 17 00:00:00 2001 From: Joffrey Bion Date: Tue, 5 May 2020 16:52:27 +0200 Subject: Upgrade to krossbow 0.20.1 (migrate to flows) --- .../ui/redux/sagas/GameBrowserSagas.kt | 14 ++++++----- .../sevenwonders/ui/redux/sagas/GameSagas.kt | 27 +++++++++------------- .../sevenwonders/ui/redux/sagas/LobbySagas.kt | 15 +++++------- .../luxons/sevenwonders/ui/redux/sagas/Sagas.kt | 6 +++-- .../sevenwonders/ui/redux/sagas/SagasFramework.kt | 16 +++++++++---- 5 files changed, 40 insertions(+), 38 deletions(-) (limited to 'sw-ui/src/main') diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameBrowserSagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameBrowserSagas.kt index 7806bc98..aa81ae55 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameBrowserSagas.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameBrowserSagas.kt @@ -1,8 +1,9 @@ package org.luxons.sevenwonders.ui.redux.sagas +import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch -import org.hildan.krossbow.stomp.StompSubscription import org.luxons.sevenwonders.client.SevenWondersSession import org.luxons.sevenwonders.model.api.LobbyDTO import org.luxons.sevenwonders.ui.redux.EnterLobbyAction @@ -23,17 +24,18 @@ private class GameBrowserSaga( ) { suspend fun run() { coroutineScope { - val gamesSubscription = session.watchGames() - launch { dispatchGameUpdates(gamesSubscription) } + val gamesDispatch = launch { dispatchGameUpdates() } val lobby = awaitCreateOrJoinGame() - gamesSubscription.unsubscribe() + gamesDispatch.cancelAndJoin() sagaContext.dispatch(EnterLobbyAction(lobby)) sagaContext.dispatch(Navigate(Route.LOBBY)) } } - private suspend fun dispatchGameUpdates(gamesSubscription: StompSubscription>) { - sagaContext.dispatchAll(gamesSubscription.messages) { UpdateGameListAction(it) } + private suspend fun dispatchGameUpdates() { + with(sagaContext) { + session.watchGames().map { UpdateGameListAction(it) }.dispatchAll() + } } private suspend fun awaitCreateOrJoinGame(): LobbyDTO = awaitFirst(this::awaitCreateGame, this::awaitJoinGame) diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameSagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameSagas.kt index 38bb7cb7..48592778 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameSagas.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameSagas.kt @@ -1,6 +1,7 @@ package org.luxons.sevenwonders.ui.redux.sagas import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch import org.luxons.sevenwonders.client.SevenWondersSession import org.luxons.sevenwonders.ui.redux.PlayerReadyEvent @@ -14,22 +15,16 @@ import org.luxons.sevenwonders.ui.redux.TurnInfoEvent suspend fun SwSagaContext.gameSaga(session: SevenWondersSession) { val game = getState().gameState ?: error("Game saga run without a current game") coroutineScope { - val playerReadySub = session.watchPlayerReady(game.id) - val preparedCardsSub = session.watchPreparedCards(game.id) - val turnInfoSub = session.watchTurns() - val sayReadyJob = launch { onEach { session.sayReady() } } - val unprepareJob = launch { onEach { session.unprepareMove() } } - val prepareMoveJob = launch { - onEach { - val move = session.prepareMove(it.move) - dispatch(PreparedMoveEvent(move)) - } - } - launch { dispatchAll(playerReadySub.messages) { PlayerReadyEvent(it) } } - launch { dispatchAll(preparedCardsSub.messages) { PreparedCardEvent(it) } } - launch { dispatchAll(turnInfoSub.messages) { TurnInfoEvent(it) } } - // TODO await game end - // TODO unsubscribe all subs, cancel all jobs + session.watchPlayerReady(game.id).map { PlayerReadyEvent(it) }.dispatchAllIn(this) + session.watchPreparedCards(game.id).map { PreparedCardEvent(it) }.dispatchAllIn(this) + session.watchOwnMoves().map { PreparedMoveEvent(it) }.dispatchAllIn(this) + session.watchTurns().map { TurnInfoEvent(it) }.dispatchAllIn(this) + + launch { onEach { session.sayReady() } } + launch { onEach { session.prepareMove(it.move) } } + launch { onEach { session.unprepareMove() } } + + // TODO await game end and cancel this scope to unsubscribe everything } console.log("End of game saga") } diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/LobbySagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/LobbySagas.kt index c3996af7..e2bf82c5 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/LobbySagas.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/LobbySagas.kt @@ -1,10 +1,9 @@ package org.luxons.sevenwonders.ui.redux.sagas import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch -import org.hildan.krossbow.stomp.StompSubscription import org.luxons.sevenwonders.client.SevenWondersSession -import org.luxons.sevenwonders.model.api.LobbyDTO import org.luxons.sevenwonders.ui.redux.EnterGameAction import org.luxons.sevenwonders.ui.redux.RequestLeaveLobby import org.luxons.sevenwonders.ui.redux.RequestStartGame @@ -17,19 +16,21 @@ suspend fun SwSagaContext.lobbySaga(session: SevenWondersSession) { val lobby = getState().currentLobby ?: error("Lobby saga run without a current lobby") coroutineScope { val lobbyUpdatesSubscription = session.watchLobbyUpdates() - launch { watchLobbyUpdates(lobbyUpdatesSubscription) } + .map { UpdateLobbyAction(it) } + .dispatchAllIn(this) + val startGameJob = launch { awaitStartGame(session) } awaitFirst( { awaitLeaveLobby(session) - lobbyUpdatesSubscription.unsubscribe() + lobbyUpdatesSubscription.cancel() startGameJob.cancel() dispatch(Navigate(Route.GAME_BROWSER)) }, { awaitGameStart(session, lobby.id) - lobbyUpdatesSubscription.unsubscribe() + lobbyUpdatesSubscription.cancel() startGameJob.cancel() dispatch(Navigate(Route.GAME)) } @@ -37,10 +38,6 @@ suspend fun SwSagaContext.lobbySaga(session: SevenWondersSession) { } } -private suspend fun SwSagaContext.watchLobbyUpdates(lobbyUpdatesSubscription: StompSubscription) { - dispatchAll(lobbyUpdatesSubscription.messages) { UpdateLobbyAction(it) } -} - private suspend fun SwSagaContext.awaitGameStart(session: SevenWondersSession, lobbyId: Long) { val turnInfo = session.awaitGameStart(lobbyId) val lobby = getState().currentLobby!! diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/Sagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/Sagas.kt index 3c66d31e..565cf7ec 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/Sagas.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/Sagas.kt @@ -1,6 +1,9 @@ package org.luxons.sevenwonders.ui.redux.sagas +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import org.luxons.sevenwonders.client.SevenWondersClient import org.luxons.sevenwonders.client.SevenWondersSession @@ -38,8 +41,7 @@ suspend fun SwSagaContext.rootSaga() = coroutineScope { } private suspend fun serverErrorSaga(session: SevenWondersSession) { - val errorsSub = session.watchErrors() - for (err in errorsSub.messages) { + session.watchErrors().collect { err -> // TODO use blueprintjs toaster console.error("${err.code}: ${err.message}") console.error(JSON.stringify(err)) diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt index 1a57708e..df00f43f 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt @@ -6,7 +6,8 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.Job import kotlinx.coroutines.channels.BroadcastChannel -import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import redux.Middleware import redux.MiddlewareApi @@ -79,14 +80,19 @@ class SagaContext( } /** - * Dispatches an action given by [createAction] for each message received in [channel]. + * Dispatches all actions from this flow. */ - suspend fun dispatchAll(channel: ReceiveChannel, createAction: (T) -> A) { - for (msg in channel) { - reduxApi.dispatch(createAction(msg)) + suspend fun Flow.dispatchAll() { + collect { + reduxApi.dispatch(it) } } + /** + * Dispatches all actions from this flow in the provided [scope]. + */ + fun Flow.dispatchAllIn(scope: CoroutineScope): Job = scope.launch { dispatchAll() } + /** * Executes [handle] on every action dispatched. This runs forever until the current coroutine is cancelled. */ -- cgit