From 9aa36cd199c1d6cf0feb959e3ec6ac0539168cd5 Mon Sep 17 00:00:00 2001 From: Joffrey Bion Date: Tue, 5 May 2020 16:52:27 +0200 Subject: Upgrade to krossbow 0.20.1 (migrate to flows) --- build.gradle.kts | 4 +- sw-client/build.gradle.kts | 2 +- .../sevenwonders/client/SevenWondersClient.kt | 47 ++++++++++++---------- sw-server/build.gradle.kts | 2 +- .../luxons/sevenwonders/server/SevenWondersTest.kt | 37 +++++++---------- .../ui/redux/sagas/GameBrowserSagas.kt | 14 ++++--- .../sevenwonders/ui/redux/sagas/GameSagas.kt | 27 +++++-------- .../sevenwonders/ui/redux/sagas/LobbySagas.kt | 15 +++---- .../luxons/sevenwonders/ui/redux/sagas/Sagas.kt | 6 ++- .../sevenwonders/ui/redux/sagas/SagasFramework.kt | 16 +++++--- 10 files changed, 85 insertions(+), 85 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 7edf6641..e44a8519 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -13,12 +13,14 @@ subprojects { jcenter() } + val compilerArgs = listOf("-Xopt-in=kotlin.RequiresOptIn") tasks.withType { kotlinOptions.jvmTarget = "1.8" + kotlinOptions.freeCompilerArgs += compilerArgs } tasks.withType { - kotlinOptions.freeCompilerArgs = listOf("-Xuse-experimental=kotlin.Experimental") + kotlinOptions.freeCompilerArgs = compilerArgs } afterEvaluate { diff --git a/sw-client/build.gradle.kts b/sw-client/build.gradle.kts index 0b74243d..8b8983c1 100644 --- a/sw-client/build.gradle.kts +++ b/sw-client/build.gradle.kts @@ -3,7 +3,7 @@ plugins { id("org.jlleitschuh.gradle.ktlint") } -val krossbowVersion = "0.10.3" +val krossbowVersion = "0.20.1" kotlin { jvm() diff --git a/sw-client/src/commonMain/kotlin/org/luxons/sevenwonders/client/SevenWondersClient.kt b/sw-client/src/commonMain/kotlin/org/luxons/sevenwonders/client/SevenWondersClient.kt index 4a610a81..480e9d67 100644 --- a/sw-client/src/commonMain/kotlin/org/luxons/sevenwonders/client/SevenWondersClient.kt +++ b/sw-client/src/commonMain/kotlin/org/luxons/sevenwonders/client/SevenWondersClient.kt @@ -1,13 +1,15 @@ package org.luxons.sevenwonders.client +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.first import kotlinx.serialization.DeserializationStrategy import kotlinx.serialization.SerializationStrategy import kotlinx.serialization.builtins.list import kotlinx.serialization.builtins.serializer import org.hildan.krossbow.stomp.StompClient -import org.hildan.krossbow.stomp.StompSubscription import org.hildan.krossbow.stomp.conversions.kxserialization.StompSessionWithKxSerialization import org.hildan.krossbow.stomp.conversions.kxserialization.convertAndSend +import org.hildan.krossbow.stomp.conversions.kxserialization.subscribe import org.hildan.krossbow.stomp.conversions.kxserialization.withJsonConversions import org.hildan.krossbow.stomp.sendEmptyMsg import org.luxons.sevenwonders.model.CustomizableSettings @@ -44,17 +46,14 @@ private suspend inline fun StompSessionWithKx ): U { val sub = subscribe(receiveDestination, deserializer) convertAndSend(sendDestination, payload, serializer) - val msg = sub.messages.receive() - sub.unsubscribe() - return msg + return sub.first() } class SevenWondersSession(private val stompSession: StompSessionWithKxSerialization) { suspend fun disconnect() = stompSession.disconnect() - suspend fun watchErrors(): StompSubscription = - stompSession.subscribe("/user/queue/errors", ErrorDTO.serializer()) + fun watchErrors(): Flow = stompSession.subscribe("/user/queue/errors", ErrorDTO.serializer()) suspend fun chooseName(displayName: String): ConnectedPlayer = stompSession.request( sendDestination = "/app/chooseName", @@ -64,7 +63,7 @@ class SevenWondersSession(private val stompSession: StompSessionWithKxSerializat deserializer = ConnectedPlayer.serializer() ) - suspend fun watchGames(): StompSubscription> = + fun watchGames(): Flow> = stompSession.subscribe("/topic/games", LobbyDTO.serializer().list) suspend fun createGame(gameName: String): LobbyDTO = stompSession.request( @@ -95,42 +94,46 @@ class SevenWondersSession(private val stompSession: StompSessionWithKxSerializat stompSession.convertAndSend("/app/lobby/updateSettings", UpdateSettingsAction(settings), UpdateSettingsAction.serializer()) } - suspend fun watchLobbyUpdates(): StompSubscription = + fun watchLobbyUpdates(): Flow = stompSession.subscribe("/user/queue/lobby/updated", LobbyDTO.serializer()) suspend fun awaitGameStart(gameId: Long): PlayerTurnInfo { - val gameStartSubscription = stompSession.subscribe("/user/queue/lobby/$gameId/started", PlayerTurnInfo.serializer()) - val turnInfo = gameStartSubscription.messages.receive() - gameStartSubscription.unsubscribe() - return turnInfo + val startEvents = stompSession.subscribe("/user/queue/lobby/$gameId/started", PlayerTurnInfo.serializer()) + return startEvents.first() } suspend fun startGame() { stompSession.sendEmptyMsg("/app/lobby/startGame") } - suspend fun watchPlayerReady(gameId: Long): StompSubscription = + fun watchPlayerReady(gameId: Long): Flow = stompSession.subscribe("/topic/game/$gameId/playerReady", String.serializer()) - suspend fun watchPreparedCards(gameId: Long): StompSubscription = + fun watchPreparedCards(gameId: Long): Flow = stompSession.subscribe("/topic/game/$gameId/prepared", PreparedCard.serializer()) - suspend fun watchTurns(): StompSubscription = + fun watchTurns(): Flow = stompSession.subscribe("/user/queue/game/turn", PlayerTurnInfo.serializer()) suspend fun sayReady() { stompSession.sendEmptyMsg("/app/game/sayReady") } - suspend fun prepareMove(move: PlayerMove): PlayerMove = stompSession.request( - sendDestination = "/app/game/prepareMove", - receiveDestination = "/user/queue/game/preparedMove", - payload = PrepareMoveAction(move), - serializer = PrepareMoveAction.serializer(), - deserializer = PlayerMove.serializer() - ) + fun watchOwnMoves(): Flow = + stompSession.subscribe(destination = "/user/queue/game/preparedMove", deserializer = PlayerMove.serializer()) + + suspend fun prepareMove(move: PlayerMove) { + stompSession.convertAndSend( + destination = "/app/game/prepareMove", + body = PrepareMoveAction(move), + serializer = PrepareMoveAction.serializer() + ) + } suspend fun unprepareMove() { stompSession.sendEmptyMsg("/app/game/unprepareMove") } + +// suspend fun watchGameEnd() { +// } } diff --git a/sw-server/build.gradle.kts b/sw-server/build.gradle.kts index fcf3ffa4..7d1ee0ae 100644 --- a/sw-server/build.gradle.kts +++ b/sw-server/build.gradle.kts @@ -32,7 +32,7 @@ dependencies { testImplementation("org.springframework.boot:spring-boot-starter-test") testImplementation("org.hildan.jackstomp:jackstomp:2.0.0") testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin") - testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-M1") + testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.5") } // packages the frontend app within the jar diff --git a/sw-server/src/test/kotlin/org/luxons/sevenwonders/server/SevenWondersTest.kt b/sw-server/src/test/kotlin/org/luxons/sevenwonders/server/SevenWondersTest.kt index f34234f7..c8a287f7 100644 --- a/sw-server/src/test/kotlin/org/luxons/sevenwonders/server/SevenWondersTest.kt +++ b/sw-server/src/test/kotlin/org/luxons/sevenwonders/server/SevenWondersTest.kt @@ -1,5 +1,7 @@ package org.luxons.sevenwonders.server +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.produceIn import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeout import kotlinx.coroutines.withTimeoutOrNull @@ -86,10 +88,11 @@ class SevenWondersTest { disconnect(ownerSession) } + @OptIn(FlowPreview::class) @Test fun createGame_seenByConnectedPlayers() = runAsyncTest { val otherSession = newPlayer("OtherPlayer") - val games = otherSession.watchGames().messages + val games = otherSession.watchGames().produceIn(this) var receivedLobbies = withTimeout(500) { games.receive() } assertNotNull(receivedLobbies) @@ -109,6 +112,7 @@ class SevenWondersTest { disconnect(ownerSession, otherSession) } + @OptIn(FlowPreview::class) @Test fun startGame_3players() = runAsyncTest { val session1 = newPlayer("Player1") @@ -120,27 +124,16 @@ class SevenWondersTest { val session3 = newPlayer("Player3") session3.joinGame(lobby.id) - val gameStart1 = launch { session1.awaitGameStart(lobby.id) } - val gameStart2 = launch { session2.awaitGameStart(lobby.id) } - val gameStart3 = launch { session3.awaitGameStart(lobby.id) } + listOf(session1, session2, session3).forEach { session -> + launch { + session.awaitGameStart(lobby.id) + val turns = session.watchTurns().produceIn(this) + session.sayReady() + val turn = turns.receive() + assertNotNull(turn) + session.disconnect() + } + } session1.startGame() - gameStart1.join() - gameStart2.join() - gameStart3.join() - - val turns1 = session1.watchTurns().messages - val turns2 = session2.watchTurns().messages - val turns3 = session3.watchTurns().messages - session1.sayReady() - session2.sayReady() - session3.sayReady() - val turn1 = turns1.receive() - val turn2 = turns2.receive() - val turn3 = turns3.receive() - assertNotNull(turn1) - assertNotNull(turn2) - assertNotNull(turn3) - - disconnect(session1, session2, session3) } } diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameBrowserSagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameBrowserSagas.kt index 7806bc98..aa81ae55 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameBrowserSagas.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameBrowserSagas.kt @@ -1,8 +1,9 @@ package org.luxons.sevenwonders.ui.redux.sagas +import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch -import org.hildan.krossbow.stomp.StompSubscription import org.luxons.sevenwonders.client.SevenWondersSession import org.luxons.sevenwonders.model.api.LobbyDTO import org.luxons.sevenwonders.ui.redux.EnterLobbyAction @@ -23,17 +24,18 @@ private class GameBrowserSaga( ) { suspend fun run() { coroutineScope { - val gamesSubscription = session.watchGames() - launch { dispatchGameUpdates(gamesSubscription) } + val gamesDispatch = launch { dispatchGameUpdates() } val lobby = awaitCreateOrJoinGame() - gamesSubscription.unsubscribe() + gamesDispatch.cancelAndJoin() sagaContext.dispatch(EnterLobbyAction(lobby)) sagaContext.dispatch(Navigate(Route.LOBBY)) } } - private suspend fun dispatchGameUpdates(gamesSubscription: StompSubscription>) { - sagaContext.dispatchAll(gamesSubscription.messages) { UpdateGameListAction(it) } + private suspend fun dispatchGameUpdates() { + with(sagaContext) { + session.watchGames().map { UpdateGameListAction(it) }.dispatchAll() + } } private suspend fun awaitCreateOrJoinGame(): LobbyDTO = awaitFirst(this::awaitCreateGame, this::awaitJoinGame) diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameSagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameSagas.kt index 38bb7cb7..48592778 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameSagas.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameSagas.kt @@ -1,6 +1,7 @@ package org.luxons.sevenwonders.ui.redux.sagas import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch import org.luxons.sevenwonders.client.SevenWondersSession import org.luxons.sevenwonders.ui.redux.PlayerReadyEvent @@ -14,22 +15,16 @@ import org.luxons.sevenwonders.ui.redux.TurnInfoEvent suspend fun SwSagaContext.gameSaga(session: SevenWondersSession) { val game = getState().gameState ?: error("Game saga run without a current game") coroutineScope { - val playerReadySub = session.watchPlayerReady(game.id) - val preparedCardsSub = session.watchPreparedCards(game.id) - val turnInfoSub = session.watchTurns() - val sayReadyJob = launch { onEach { session.sayReady() } } - val unprepareJob = launch { onEach { session.unprepareMove() } } - val prepareMoveJob = launch { - onEach { - val move = session.prepareMove(it.move) - dispatch(PreparedMoveEvent(move)) - } - } - launch { dispatchAll(playerReadySub.messages) { PlayerReadyEvent(it) } } - launch { dispatchAll(preparedCardsSub.messages) { PreparedCardEvent(it) } } - launch { dispatchAll(turnInfoSub.messages) { TurnInfoEvent(it) } } - // TODO await game end - // TODO unsubscribe all subs, cancel all jobs + session.watchPlayerReady(game.id).map { PlayerReadyEvent(it) }.dispatchAllIn(this) + session.watchPreparedCards(game.id).map { PreparedCardEvent(it) }.dispatchAllIn(this) + session.watchOwnMoves().map { PreparedMoveEvent(it) }.dispatchAllIn(this) + session.watchTurns().map { TurnInfoEvent(it) }.dispatchAllIn(this) + + launch { onEach { session.sayReady() } } + launch { onEach { session.prepareMove(it.move) } } + launch { onEach { session.unprepareMove() } } + + // TODO await game end and cancel this scope to unsubscribe everything } console.log("End of game saga") } diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/LobbySagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/LobbySagas.kt index c3996af7..e2bf82c5 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/LobbySagas.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/LobbySagas.kt @@ -1,10 +1,9 @@ package org.luxons.sevenwonders.ui.redux.sagas import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.map import kotlinx.coroutines.launch -import org.hildan.krossbow.stomp.StompSubscription import org.luxons.sevenwonders.client.SevenWondersSession -import org.luxons.sevenwonders.model.api.LobbyDTO import org.luxons.sevenwonders.ui.redux.EnterGameAction import org.luxons.sevenwonders.ui.redux.RequestLeaveLobby import org.luxons.sevenwonders.ui.redux.RequestStartGame @@ -17,19 +16,21 @@ suspend fun SwSagaContext.lobbySaga(session: SevenWondersSession) { val lobby = getState().currentLobby ?: error("Lobby saga run without a current lobby") coroutineScope { val lobbyUpdatesSubscription = session.watchLobbyUpdates() - launch { watchLobbyUpdates(lobbyUpdatesSubscription) } + .map { UpdateLobbyAction(it) } + .dispatchAllIn(this) + val startGameJob = launch { awaitStartGame(session) } awaitFirst( { awaitLeaveLobby(session) - lobbyUpdatesSubscription.unsubscribe() + lobbyUpdatesSubscription.cancel() startGameJob.cancel() dispatch(Navigate(Route.GAME_BROWSER)) }, { awaitGameStart(session, lobby.id) - lobbyUpdatesSubscription.unsubscribe() + lobbyUpdatesSubscription.cancel() startGameJob.cancel() dispatch(Navigate(Route.GAME)) } @@ -37,10 +38,6 @@ suspend fun SwSagaContext.lobbySaga(session: SevenWondersSession) { } } -private suspend fun SwSagaContext.watchLobbyUpdates(lobbyUpdatesSubscription: StompSubscription) { - dispatchAll(lobbyUpdatesSubscription.messages) { UpdateLobbyAction(it) } -} - private suspend fun SwSagaContext.awaitGameStart(session: SevenWondersSession, lobbyId: Long) { val turnInfo = session.awaitGameStart(lobbyId) val lobby = getState().currentLobby!! diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/Sagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/Sagas.kt index 3c66d31e..565cf7ec 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/Sagas.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/Sagas.kt @@ -1,6 +1,9 @@ package org.luxons.sevenwonders.ui.redux.sagas +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import org.luxons.sevenwonders.client.SevenWondersClient import org.luxons.sevenwonders.client.SevenWondersSession @@ -38,8 +41,7 @@ suspend fun SwSagaContext.rootSaga() = coroutineScope { } private suspend fun serverErrorSaga(session: SevenWondersSession) { - val errorsSub = session.watchErrors() - for (err in errorsSub.messages) { + session.watchErrors().collect { err -> // TODO use blueprintjs toaster console.error("${err.code}: ${err.message}") console.error(JSON.stringify(err)) diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt index 1a57708e..df00f43f 100644 --- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt +++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt @@ -6,7 +6,8 @@ import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.Job import kotlinx.coroutines.channels.BroadcastChannel -import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import redux.Middleware import redux.MiddlewareApi @@ -79,14 +80,19 @@ class SagaContext( } /** - * Dispatches an action given by [createAction] for each message received in [channel]. + * Dispatches all actions from this flow. */ - suspend fun dispatchAll(channel: ReceiveChannel, createAction: (T) -> A) { - for (msg in channel) { - reduxApi.dispatch(createAction(msg)) + suspend fun Flow.dispatchAll() { + collect { + reduxApi.dispatch(it) } } + /** + * Dispatches all actions from this flow in the provided [scope]. + */ + fun Flow.dispatchAllIn(scope: CoroutineScope): Job = scope.launch { dispatchAll() } + /** * Executes [handle] on every action dispatched. This runs forever until the current coroutine is cancelled. */ -- cgit