summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoffrey Bion <joffrey.bion@booking.com>2020-05-05 16:52:27 +0200
committerJoffrey Bion <joffrey.bion@booking.com>2020-05-12 09:22:25 +0200
commit9aa36cd199c1d6cf0feb959e3ec6ac0539168cd5 (patch)
tree03b40254778add686a1aaf7a263f099e748097fc
parentMove score to common model (to enable passing it to client) (diff)
downloadseven-wonders-9aa36cd199c1d6cf0feb959e3ec6ac0539168cd5.tar.gz
seven-wonders-9aa36cd199c1d6cf0feb959e3ec6ac0539168cd5.tar.bz2
seven-wonders-9aa36cd199c1d6cf0feb959e3ec6ac0539168cd5.zip
Upgrade to krossbow 0.20.1 (migrate to flows)
-rw-r--r--build.gradle.kts4
-rw-r--r--sw-client/build.gradle.kts2
-rw-r--r--sw-client/src/commonMain/kotlin/org/luxons/sevenwonders/client/SevenWondersClient.kt47
-rw-r--r--sw-server/build.gradle.kts2
-rw-r--r--sw-server/src/test/kotlin/org/luxons/sevenwonders/server/SevenWondersTest.kt37
-rw-r--r--sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameBrowserSagas.kt14
-rw-r--r--sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameSagas.kt27
-rw-r--r--sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/LobbySagas.kt15
-rw-r--r--sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/Sagas.kt6
-rw-r--r--sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt16
10 files changed, 85 insertions, 85 deletions
diff --git a/build.gradle.kts b/build.gradle.kts
index 7edf6641..e44a8519 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -13,12 +13,14 @@ subprojects {
jcenter()
}
+ val compilerArgs = listOf("-Xopt-in=kotlin.RequiresOptIn")
tasks.withType<org.jetbrains.kotlin.gradle.tasks.KotlinCompile> {
kotlinOptions.jvmTarget = "1.8"
+ kotlinOptions.freeCompilerArgs += compilerArgs
}
tasks.withType<org.jetbrains.kotlin.gradle.tasks.Kotlin2JsCompile> {
- kotlinOptions.freeCompilerArgs = listOf("-Xuse-experimental=kotlin.Experimental")
+ kotlinOptions.freeCompilerArgs = compilerArgs
}
afterEvaluate {
diff --git a/sw-client/build.gradle.kts b/sw-client/build.gradle.kts
index 0b74243d..8b8983c1 100644
--- a/sw-client/build.gradle.kts
+++ b/sw-client/build.gradle.kts
@@ -3,7 +3,7 @@ plugins {
id("org.jlleitschuh.gradle.ktlint")
}
-val krossbowVersion = "0.10.3"
+val krossbowVersion = "0.20.1"
kotlin {
jvm()
diff --git a/sw-client/src/commonMain/kotlin/org/luxons/sevenwonders/client/SevenWondersClient.kt b/sw-client/src/commonMain/kotlin/org/luxons/sevenwonders/client/SevenWondersClient.kt
index 4a610a81..480e9d67 100644
--- a/sw-client/src/commonMain/kotlin/org/luxons/sevenwonders/client/SevenWondersClient.kt
+++ b/sw-client/src/commonMain/kotlin/org/luxons/sevenwonders/client/SevenWondersClient.kt
@@ -1,13 +1,15 @@
package org.luxons.sevenwonders.client
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.first
import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.SerializationStrategy
import kotlinx.serialization.builtins.list
import kotlinx.serialization.builtins.serializer
import org.hildan.krossbow.stomp.StompClient
-import org.hildan.krossbow.stomp.StompSubscription
import org.hildan.krossbow.stomp.conversions.kxserialization.StompSessionWithKxSerialization
import org.hildan.krossbow.stomp.conversions.kxserialization.convertAndSend
+import org.hildan.krossbow.stomp.conversions.kxserialization.subscribe
import org.hildan.krossbow.stomp.conversions.kxserialization.withJsonConversions
import org.hildan.krossbow.stomp.sendEmptyMsg
import org.luxons.sevenwonders.model.CustomizableSettings
@@ -44,17 +46,14 @@ private suspend inline fun <reified T : Any, reified U : Any> StompSessionWithKx
): U {
val sub = subscribe(receiveDestination, deserializer)
convertAndSend(sendDestination, payload, serializer)
- val msg = sub.messages.receive()
- sub.unsubscribe()
- return msg
+ return sub.first()
}
class SevenWondersSession(private val stompSession: StompSessionWithKxSerialization) {
suspend fun disconnect() = stompSession.disconnect()
- suspend fun watchErrors(): StompSubscription<ErrorDTO> =
- stompSession.subscribe("/user/queue/errors", ErrorDTO.serializer())
+ fun watchErrors(): Flow<ErrorDTO> = stompSession.subscribe("/user/queue/errors", ErrorDTO.serializer())
suspend fun chooseName(displayName: String): ConnectedPlayer = stompSession.request(
sendDestination = "/app/chooseName",
@@ -64,7 +63,7 @@ class SevenWondersSession(private val stompSession: StompSessionWithKxSerializat
deserializer = ConnectedPlayer.serializer()
)
- suspend fun watchGames(): StompSubscription<List<LobbyDTO>> =
+ fun watchGames(): Flow<List<LobbyDTO>> =
stompSession.subscribe("/topic/games", LobbyDTO.serializer().list)
suspend fun createGame(gameName: String): LobbyDTO = stompSession.request(
@@ -95,42 +94,46 @@ class SevenWondersSession(private val stompSession: StompSessionWithKxSerializat
stompSession.convertAndSend("/app/lobby/updateSettings", UpdateSettingsAction(settings), UpdateSettingsAction.serializer())
}
- suspend fun watchLobbyUpdates(): StompSubscription<LobbyDTO> =
+ fun watchLobbyUpdates(): Flow<LobbyDTO> =
stompSession.subscribe("/user/queue/lobby/updated", LobbyDTO.serializer())
suspend fun awaitGameStart(gameId: Long): PlayerTurnInfo {
- val gameStartSubscription = stompSession.subscribe("/user/queue/lobby/$gameId/started", PlayerTurnInfo.serializer())
- val turnInfo = gameStartSubscription.messages.receive()
- gameStartSubscription.unsubscribe()
- return turnInfo
+ val startEvents = stompSession.subscribe("/user/queue/lobby/$gameId/started", PlayerTurnInfo.serializer())
+ return startEvents.first()
}
suspend fun startGame() {
stompSession.sendEmptyMsg("/app/lobby/startGame")
}
- suspend fun watchPlayerReady(gameId: Long): StompSubscription<String> =
+ fun watchPlayerReady(gameId: Long): Flow<String> =
stompSession.subscribe("/topic/game/$gameId/playerReady", String.serializer())
- suspend fun watchPreparedCards(gameId: Long): StompSubscription<PreparedCard> =
+ fun watchPreparedCards(gameId: Long): Flow<PreparedCard> =
stompSession.subscribe("/topic/game/$gameId/prepared", PreparedCard.serializer())
- suspend fun watchTurns(): StompSubscription<PlayerTurnInfo> =
+ fun watchTurns(): Flow<PlayerTurnInfo> =
stompSession.subscribe("/user/queue/game/turn", PlayerTurnInfo.serializer())
suspend fun sayReady() {
stompSession.sendEmptyMsg("/app/game/sayReady")
}
- suspend fun prepareMove(move: PlayerMove): PlayerMove = stompSession.request(
- sendDestination = "/app/game/prepareMove",
- receiveDestination = "/user/queue/game/preparedMove",
- payload = PrepareMoveAction(move),
- serializer = PrepareMoveAction.serializer(),
- deserializer = PlayerMove.serializer()
- )
+ fun watchOwnMoves(): Flow<PlayerMove> =
+ stompSession.subscribe(destination = "/user/queue/game/preparedMove", deserializer = PlayerMove.serializer())
+
+ suspend fun prepareMove(move: PlayerMove) {
+ stompSession.convertAndSend(
+ destination = "/app/game/prepareMove",
+ body = PrepareMoveAction(move),
+ serializer = PrepareMoveAction.serializer()
+ )
+ }
suspend fun unprepareMove() {
stompSession.sendEmptyMsg("/app/game/unprepareMove")
}
+
+// suspend fun watchGameEnd() {
+// }
}
diff --git a/sw-server/build.gradle.kts b/sw-server/build.gradle.kts
index fcf3ffa4..7d1ee0ae 100644
--- a/sw-server/build.gradle.kts
+++ b/sw-server/build.gradle.kts
@@ -32,7 +32,7 @@ dependencies {
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.hildan.jackstomp:jackstomp:2.0.0")
testImplementation("com.fasterxml.jackson.module:jackson-module-kotlin")
- testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.0-M1")
+ testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.5")
}
// packages the frontend app within the jar
diff --git a/sw-server/src/test/kotlin/org/luxons/sevenwonders/server/SevenWondersTest.kt b/sw-server/src/test/kotlin/org/luxons/sevenwonders/server/SevenWondersTest.kt
index f34234f7..c8a287f7 100644
--- a/sw-server/src/test/kotlin/org/luxons/sevenwonders/server/SevenWondersTest.kt
+++ b/sw-server/src/test/kotlin/org/luxons/sevenwonders/server/SevenWondersTest.kt
@@ -1,5 +1,7 @@
package org.luxons.sevenwonders.server
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.flow.produceIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
@@ -86,10 +88,11 @@ class SevenWondersTest {
disconnect(ownerSession)
}
+ @OptIn(FlowPreview::class)
@Test
fun createGame_seenByConnectedPlayers() = runAsyncTest {
val otherSession = newPlayer("OtherPlayer")
- val games = otherSession.watchGames().messages
+ val games = otherSession.watchGames().produceIn(this)
var receivedLobbies = withTimeout(500) { games.receive() }
assertNotNull(receivedLobbies)
@@ -109,6 +112,7 @@ class SevenWondersTest {
disconnect(ownerSession, otherSession)
}
+ @OptIn(FlowPreview::class)
@Test
fun startGame_3players() = runAsyncTest {
val session1 = newPlayer("Player1")
@@ -120,27 +124,16 @@ class SevenWondersTest {
val session3 = newPlayer("Player3")
session3.joinGame(lobby.id)
- val gameStart1 = launch { session1.awaitGameStart(lobby.id) }
- val gameStart2 = launch { session2.awaitGameStart(lobby.id) }
- val gameStart3 = launch { session3.awaitGameStart(lobby.id) }
+ listOf(session1, session2, session3).forEach { session ->
+ launch {
+ session.awaitGameStart(lobby.id)
+ val turns = session.watchTurns().produceIn(this)
+ session.sayReady()
+ val turn = turns.receive()
+ assertNotNull(turn)
+ session.disconnect()
+ }
+ }
session1.startGame()
- gameStart1.join()
- gameStart2.join()
- gameStart3.join()
-
- val turns1 = session1.watchTurns().messages
- val turns2 = session2.watchTurns().messages
- val turns3 = session3.watchTurns().messages
- session1.sayReady()
- session2.sayReady()
- session3.sayReady()
- val turn1 = turns1.receive()
- val turn2 = turns2.receive()
- val turn3 = turns3.receive()
- assertNotNull(turn1)
- assertNotNull(turn2)
- assertNotNull(turn3)
-
- disconnect(session1, session2, session3)
}
}
diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameBrowserSagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameBrowserSagas.kt
index 7806bc98..aa81ae55 100644
--- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameBrowserSagas.kt
+++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameBrowserSagas.kt
@@ -1,8 +1,9 @@
package org.luxons.sevenwonders.ui.redux.sagas
+import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
-import org.hildan.krossbow.stomp.StompSubscription
import org.luxons.sevenwonders.client.SevenWondersSession
import org.luxons.sevenwonders.model.api.LobbyDTO
import org.luxons.sevenwonders.ui.redux.EnterLobbyAction
@@ -23,17 +24,18 @@ private class GameBrowserSaga(
) {
suspend fun run() {
coroutineScope {
- val gamesSubscription = session.watchGames()
- launch { dispatchGameUpdates(gamesSubscription) }
+ val gamesDispatch = launch { dispatchGameUpdates() }
val lobby = awaitCreateOrJoinGame()
- gamesSubscription.unsubscribe()
+ gamesDispatch.cancelAndJoin()
sagaContext.dispatch(EnterLobbyAction(lobby))
sagaContext.dispatch(Navigate(Route.LOBBY))
}
}
- private suspend fun dispatchGameUpdates(gamesSubscription: StompSubscription<List<LobbyDTO>>) {
- sagaContext.dispatchAll(gamesSubscription.messages) { UpdateGameListAction(it) }
+ private suspend fun dispatchGameUpdates() {
+ with(sagaContext) {
+ session.watchGames().map { UpdateGameListAction(it) }.dispatchAll()
+ }
}
private suspend fun awaitCreateOrJoinGame(): LobbyDTO = awaitFirst(this::awaitCreateGame, this::awaitJoinGame)
diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameSagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameSagas.kt
index 38bb7cb7..48592778 100644
--- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameSagas.kt
+++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/GameSagas.kt
@@ -1,6 +1,7 @@
package org.luxons.sevenwonders.ui.redux.sagas
import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import org.luxons.sevenwonders.client.SevenWondersSession
import org.luxons.sevenwonders.ui.redux.PlayerReadyEvent
@@ -14,22 +15,16 @@ import org.luxons.sevenwonders.ui.redux.TurnInfoEvent
suspend fun SwSagaContext.gameSaga(session: SevenWondersSession) {
val game = getState().gameState ?: error("Game saga run without a current game")
coroutineScope {
- val playerReadySub = session.watchPlayerReady(game.id)
- val preparedCardsSub = session.watchPreparedCards(game.id)
- val turnInfoSub = session.watchTurns()
- val sayReadyJob = launch { onEach<RequestSayReady> { session.sayReady() } }
- val unprepareJob = launch { onEach<RequestUnprepareMove> { session.unprepareMove() } }
- val prepareMoveJob = launch {
- onEach<RequestPrepareMove> {
- val move = session.prepareMove(it.move)
- dispatch(PreparedMoveEvent(move))
- }
- }
- launch { dispatchAll(playerReadySub.messages) { PlayerReadyEvent(it) } }
- launch { dispatchAll(preparedCardsSub.messages) { PreparedCardEvent(it) } }
- launch { dispatchAll(turnInfoSub.messages) { TurnInfoEvent(it) } }
- // TODO await game end
- // TODO unsubscribe all subs, cancel all jobs
+ session.watchPlayerReady(game.id).map { PlayerReadyEvent(it) }.dispatchAllIn(this)
+ session.watchPreparedCards(game.id).map { PreparedCardEvent(it) }.dispatchAllIn(this)
+ session.watchOwnMoves().map { PreparedMoveEvent(it) }.dispatchAllIn(this)
+ session.watchTurns().map { TurnInfoEvent(it) }.dispatchAllIn(this)
+
+ launch { onEach<RequestSayReady> { session.sayReady() } }
+ launch { onEach<RequestPrepareMove> { session.prepareMove(it.move) } }
+ launch { onEach<RequestUnprepareMove> { session.unprepareMove() } }
+
+ // TODO await game end and cancel this scope to unsubscribe everything
}
console.log("End of game saga")
}
diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/LobbySagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/LobbySagas.kt
index c3996af7..e2bf82c5 100644
--- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/LobbySagas.kt
+++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/LobbySagas.kt
@@ -1,10 +1,9 @@
package org.luxons.sevenwonders.ui.redux.sagas
import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
-import org.hildan.krossbow.stomp.StompSubscription
import org.luxons.sevenwonders.client.SevenWondersSession
-import org.luxons.sevenwonders.model.api.LobbyDTO
import org.luxons.sevenwonders.ui.redux.EnterGameAction
import org.luxons.sevenwonders.ui.redux.RequestLeaveLobby
import org.luxons.sevenwonders.ui.redux.RequestStartGame
@@ -17,19 +16,21 @@ suspend fun SwSagaContext.lobbySaga(session: SevenWondersSession) {
val lobby = getState().currentLobby ?: error("Lobby saga run without a current lobby")
coroutineScope {
val lobbyUpdatesSubscription = session.watchLobbyUpdates()
- launch { watchLobbyUpdates(lobbyUpdatesSubscription) }
+ .map { UpdateLobbyAction(it) }
+ .dispatchAllIn(this)
+
val startGameJob = launch { awaitStartGame(session) }
awaitFirst(
{
awaitLeaveLobby(session)
- lobbyUpdatesSubscription.unsubscribe()
+ lobbyUpdatesSubscription.cancel()
startGameJob.cancel()
dispatch(Navigate(Route.GAME_BROWSER))
},
{
awaitGameStart(session, lobby.id)
- lobbyUpdatesSubscription.unsubscribe()
+ lobbyUpdatesSubscription.cancel()
startGameJob.cancel()
dispatch(Navigate(Route.GAME))
}
@@ -37,10 +38,6 @@ suspend fun SwSagaContext.lobbySaga(session: SevenWondersSession) {
}
}
-private suspend fun SwSagaContext.watchLobbyUpdates(lobbyUpdatesSubscription: StompSubscription<LobbyDTO>) {
- dispatchAll(lobbyUpdatesSubscription.messages) { UpdateLobbyAction(it) }
-}
-
private suspend fun SwSagaContext.awaitGameStart(session: SevenWondersSession, lobbyId: Long) {
val turnInfo = session.awaitGameStart(lobbyId)
val lobby = getState().currentLobby!!
diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/Sagas.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/Sagas.kt
index 3c66d31e..565cf7ec 100644
--- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/Sagas.kt
+++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/Sagas.kt
@@ -1,6 +1,9 @@
package org.luxons.sevenwonders.ui.redux.sagas
+import kotlinx.coroutines.CoroutineStart
+import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import org.luxons.sevenwonders.client.SevenWondersClient
import org.luxons.sevenwonders.client.SevenWondersSession
@@ -38,8 +41,7 @@ suspend fun SwSagaContext.rootSaga() = coroutineScope {
}
private suspend fun serverErrorSaga(session: SevenWondersSession) {
- val errorsSub = session.watchErrors()
- for (err in errorsSub.messages) {
+ session.watchErrors().collect { err ->
// TODO use blueprintjs toaster
console.error("${err.code}: ${err.message}")
console.error(JSON.stringify(err))
diff --git a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt
index 1a57708e..df00f43f 100644
--- a/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt
+++ b/sw-ui/src/main/kotlin/org/luxons/sevenwonders/ui/redux/sagas/SagasFramework.kt
@@ -6,7 +6,8 @@ import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.BroadcastChannel
-import kotlinx.coroutines.channels.ReceiveChannel
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import redux.Middleware
import redux.MiddlewareApi
@@ -79,15 +80,20 @@ class SagaContext<S, A : RAction, R>(
}
/**
- * Dispatches an action given by [createAction] for each message received in [channel].
+ * Dispatches all actions from this flow.
*/
- suspend fun <T> dispatchAll(channel: ReceiveChannel<T>, createAction: (T) -> A) {
- for (msg in channel) {
- reduxApi.dispatch(createAction(msg))
+ suspend fun Flow<A>.dispatchAll() {
+ collect {
+ reduxApi.dispatch(it)
}
}
/**
+ * Dispatches all actions from this flow in the provided [scope].
+ */
+ fun Flow<A>.dispatchAllIn(scope: CoroutineScope): Job = scope.launch { dispatchAll() }
+
+ /**
* Executes [handle] on every action dispatched. This runs forever until the current coroutine is cancelled.
*/
suspend fun onEach(handle: suspend SagaContext<S, A, R>.(A) -> Unit) {
bgstack15