summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorjbion <joffrey.bion@amadeus.com>2017-05-25 03:22:15 +0200
committerjbion <joffrey.bion@amadeus.com>2017-05-25 03:22:15 +0200
commitcfd455c04ebe374607308e4ec27668f22d78f5f7 (patch)
tree1746756933fbf24137ced4f8550df5e6c2a1ad95
parentAdd flow (diff)
downloadseven-wonders-cfd455c04ebe374607308e4ec27668f22d78f5f7.tar.gz
seven-wonders-cfd455c04ebe374607308e4ec27668f22d78f5f7.tar.bz2
seven-wonders-cfd455c04ebe374607308e4ec27668f22d78f5f7.zip
Add a simple JSON+STOMP client (to be extracted as lib)
-rw-r--r--backend/src/test/java/org/luxons/sevenwonders/test/TestStompFrameHandler.java44
-rw-r--r--backend/src/test/java/org/luxons/sevenwonders/test/client/Channel.java32
-rw-r--r--backend/src/test/java/org/luxons/sevenwonders/test/client/EmptyMsgStompFrameHandler.java29
-rw-r--r--backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompClient.java68
-rw-r--r--backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompSession.java93
-rw-r--r--backend/src/test/java/org/luxons/sevenwonders/test/client/LoggingStompSessionHandler.java40
-rw-r--r--backend/src/test/java/org/luxons/sevenwonders/test/client/QueuedStompFrameHandler.java32
7 files changed, 294 insertions, 44 deletions
diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/TestStompFrameHandler.java b/backend/src/test/java/org/luxons/sevenwonders/test/TestStompFrameHandler.java
deleted file mode 100644
index 3f253a06..00000000
--- a/backend/src/test/java/org/luxons/sevenwonders/test/TestStompFrameHandler.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package org.luxons.sevenwonders.test;
-
-import java.lang.reflect.Type;
-import java.util.concurrent.BlockingQueue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.messaging.simp.stomp.StompFrameHandler;
-import org.springframework.messaging.simp.stomp.StompHeaders;
-
-public class TestStompFrameHandler<T> implements StompFrameHandler {
-
- private static final Logger logger = LoggerFactory.getLogger(TestStompFrameHandler.class);
-
- private final BlockingQueue<T> blockingQueue;
-
- private final Class<T> type;
-
- public static TestStompFrameHandler<String> defaultHandler(BlockingQueue<String> blockingQueue) {
- return new TestStompFrameHandler<>(blockingQueue, String.class);
- }
-
- public TestStompFrameHandler(BlockingQueue<T> blockingQueue, Class<T> type) {
- this.blockingQueue = blockingQueue;
- this.type = type;
- }
-
- @Override
- public Type getPayloadType(StompHeaders stompHeaders) {
- logger.info("Handler.getPayloadType, headers = " + stompHeaders);
-// throw new RuntimeException("TEST EXCEPTION");
- return type;
- }
-
- @Override
- public void handleFrame(StompHeaders stompHeaders, Object o) {
- blockingQueue.offer(type.cast(o));
-// try {
-// blockingQueue.offer(new ObjectMapper().readValue((String) o, type));
-// } catch (IOException e) {
-// throw new RuntimeException("Could not convert frame", e);
-// }
- }
-}
diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/client/Channel.java b/backend/src/test/java/org/luxons/sevenwonders/test/client/Channel.java
new file mode 100644
index 00000000..6d3f1f41
--- /dev/null
+++ b/backend/src/test/java/org/luxons/sevenwonders/test/client/Channel.java
@@ -0,0 +1,32 @@
+package org.luxons.sevenwonders.test.client;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.springframework.messaging.simp.stomp.StompSession.Subscription;
+
+public class Channel<T> {
+
+ private static final int DEFAULT_RECEPTION_TIMEOUT_IN_SECONDS = 10;
+
+ private final BlockingQueue<T> messageQueue;
+
+ private final Subscription subscription;
+
+ Channel(Subscription subscription, BlockingQueue<T> messageQueue) {
+ this.subscription = subscription;
+ this.messageQueue = messageQueue;
+ }
+
+ public T next() throws InterruptedException {
+ return messageQueue.poll(DEFAULT_RECEPTION_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
+ }
+
+ public T next(long timeout, TimeUnit unit) throws InterruptedException {
+ return messageQueue.poll(timeout, unit);
+ }
+
+ public void unsubscribe() {
+ subscription.unsubscribe();
+ }
+}
diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/client/EmptyMsgStompFrameHandler.java b/backend/src/test/java/org/luxons/sevenwonders/test/client/EmptyMsgStompFrameHandler.java
new file mode 100644
index 00000000..bd04daea
--- /dev/null
+++ b/backend/src/test/java/org/luxons/sevenwonders/test/client/EmptyMsgStompFrameHandler.java
@@ -0,0 +1,29 @@
+package org.luxons.sevenwonders.test.client;
+
+import java.lang.reflect.Type;
+import java.util.concurrent.BlockingQueue;
+
+import org.springframework.messaging.simp.stomp.StompFrameHandler;
+import org.springframework.messaging.simp.stomp.StompHeaders;
+
+class EmptyMsgStompFrameHandler implements StompFrameHandler {
+
+ private final BlockingQueue<Object> blockingQueue;
+
+ EmptyMsgStompFrameHandler(BlockingQueue<Object> blockingQueue) {
+ this.blockingQueue = blockingQueue;
+ }
+
+ @Override
+ public Type getPayloadType(StompHeaders stompHeaders) {
+ return Object.class;
+ }
+
+ @Override
+ public void handleFrame(StompHeaders stompHeaders, Object o) {
+ if (o != null) {
+ throw new IllegalArgumentException("Non-null payload in EmptyMsgStompFrameHandler");
+ }
+ blockingQueue.offer(new Object());
+ }
+}
diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompClient.java b/backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompClient.java
new file mode 100644
index 00000000..1e9a5c3e
--- /dev/null
+++ b/backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompClient.java
@@ -0,0 +1,68 @@
+package org.luxons.sevenwonders.test.client;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.springframework.messaging.converter.MappingJackson2MessageConverter;
+import org.springframework.messaging.simp.stomp.StompSession;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+import org.springframework.web.socket.client.WebSocketClient;
+import org.springframework.web.socket.client.standard.StandardWebSocketClient;
+import org.springframework.web.socket.messaging.WebSocketStompClient;
+import org.springframework.web.socket.sockjs.client.SockJsClient;
+import org.springframework.web.socket.sockjs.client.Transport;
+import org.springframework.web.socket.sockjs.client.WebSocketTransport;
+
+public class JsonStompClient {
+
+ private static final long DEFAULT_CONNECTION_TIMEOUT_IN_SECONDS = 15;
+
+ private final WebSocketStompClient client;
+
+ public JsonStompClient() {
+ this(createDefaultStompClient());
+ }
+
+ public JsonStompClient(WebSocketStompClient client) {
+ this.client = client;
+ }
+
+ private static WebSocketStompClient createDefaultStompClient() {
+ WebSocketStompClient stompClient = new WebSocketStompClient(createWebSocketClient());
+ stompClient.setMessageConverter(new MappingJackson2MessageConverter()); // for custom object exchanges
+ stompClient.setTaskScheduler(createTaskScheduler()); // for heartbeats
+ return stompClient;
+ }
+
+ private static WebSocketClient createWebSocketClient() {
+ return new SockJsClient(createWsTransports());
+ }
+
+ private static List<Transport> createWsTransports() {
+ return Collections.singletonList(new WebSocketTransport(new StandardWebSocketClient()));
+ }
+
+ private static ThreadPoolTaskScheduler createTaskScheduler() {
+ ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
+ taskScheduler.afterPropertiesSet();
+ return taskScheduler;
+ }
+
+ public JsonStompSession connect(String url) throws InterruptedException, ExecutionException, TimeoutException {
+ return connect(url, DEFAULT_CONNECTION_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
+ }
+
+ public JsonStompSession connect(String url, long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ StompSession session = client.connect(url, new LoggingStompSessionHandler()).get(timeout, unit);
+ session.setAutoReceipt(true);
+ return new JsonStompSession(session);
+ }
+
+ public void stop() {
+ client.stop();
+ }
+}
diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompSession.java b/backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompSession.java
new file mode 100644
index 00000000..d2ded856
--- /dev/null
+++ b/backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompSession.java
@@ -0,0 +1,93 @@
+package org.luxons.sevenwonders.test.client;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import org.springframework.messaging.simp.stomp.StompFrameHandler;
+import org.springframework.messaging.simp.stomp.StompHeaders;
+import org.springframework.messaging.simp.stomp.StompSession;
+
+public class JsonStompSession implements StompSession {
+
+ private final StompSession stompSession;
+
+ public JsonStompSession(StompSession stompSession) {
+ this.stompSession = stompSession;
+ }
+
+ @Override
+ public Receiptable send(String destination, Object payload) {
+ return stompSession.send(destination, payload);
+ }
+
+ @Override
+ public Receiptable send(StompHeaders headers, Object payload) {
+ return stompSession.send(headers, payload);
+ }
+
+ @Override
+ public Subscription subscribe(String destination, StompFrameHandler handler) {
+ return stompSession.subscribe(destination, handler);
+ }
+
+ @Override
+ public Subscription subscribe(StompHeaders headers, StompFrameHandler handler) {
+ return stompSession.subscribe(headers, handler);
+ }
+
+ public <T> Channel<T> subscribe(String destination, Class<T> payloadType) {
+ BlockingQueue<T> blockingQueue = new LinkedBlockingDeque<>();
+ StompFrameHandler frameHandler = new QueuedStompFrameHandler<>(blockingQueue, payloadType);
+ Subscription sub = stompSession.subscribe(destination, frameHandler);
+ return new Channel<>(sub, blockingQueue);
+ }
+
+ public Channel<Object> subscribeEmptyMsgs(String destination) {
+ BlockingQueue<Object> blockingQueue = new LinkedBlockingDeque<>();
+ StompFrameHandler frameHandler = new EmptyMsgStompFrameHandler(blockingQueue);
+ Subscription sub = stompSession.subscribe(destination, frameHandler);
+ return new Channel<>(sub, blockingQueue);
+ }
+ @Override
+ public Receiptable acknowledge(String messageId, boolean consumed) {
+ return stompSession.acknowledge(messageId, consumed);
+ }
+
+ @Override
+ public String getSessionId() {
+ return stompSession.getSessionId();
+ }
+
+ @Override
+ public boolean isConnected() {
+ return stompSession.isConnected();
+ }
+
+ @Override
+ public void setAutoReceipt(boolean enabled) {
+ stompSession.setAutoReceipt(enabled);
+ }
+
+ @Override
+ public void disconnect() {
+ stompSession.disconnect();
+ }
+
+ public <T> T request(String requestDestination, Object payload, String responseDestination, Class<T> responseType)
+ throws InterruptedException {
+ Channel<T> channel = subscribe(responseDestination, responseType);
+ send(requestDestination, payload);
+ T msg = channel.next();
+ channel.unsubscribe();
+ return msg;
+ }
+
+ public boolean request(String requestDestination, Object payload, String responseDestination)
+ throws InterruptedException {
+ Channel<Object> channel = subscribeEmptyMsgs(responseDestination);
+ send(requestDestination, payload);
+ Object msg = channel.next();
+ channel.unsubscribe();
+ return msg != null;
+ }
+}
diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/client/LoggingStompSessionHandler.java b/backend/src/test/java/org/luxons/sevenwonders/test/client/LoggingStompSessionHandler.java
new file mode 100644
index 00000000..abf223fd
--- /dev/null
+++ b/backend/src/test/java/org/luxons/sevenwonders/test/client/LoggingStompSessionHandler.java
@@ -0,0 +1,40 @@
+package org.luxons.sevenwonders.test.client;
+
+import java.lang.reflect.Type;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.messaging.simp.stomp.StompCommand;
+import org.springframework.messaging.simp.stomp.StompHeaders;
+import org.springframework.messaging.simp.stomp.StompSession;
+import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
+
+class LoggingStompSessionHandler extends StompSessionHandlerAdapter {
+
+ private static final Logger logger = LoggerFactory.getLogger(LoggingStompSessionHandler.class);
+
+ @Override
+ public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
+ logger.info("Client connected under session id " + session.getSessionId());
+ }
+
+ @Override
+ public void handleFrame(StompHeaders headers, Object payload) {
+ }
+
+ @Override
+ public Type getPayloadType(StompHeaders headers) {
+ return String.class;
+ }
+
+ @Override
+ public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload,
+ Throwable exception) {
+ logger.error("Exception thrown in session " + session.getSessionId(), exception);
+ }
+
+ @Override
+ public void handleTransportError(StompSession session, Throwable exception) {
+ logger.error("Transport exception thrown in session " + session.getSessionId(), exception);
+ }
+}
diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/client/QueuedStompFrameHandler.java b/backend/src/test/java/org/luxons/sevenwonders/test/client/QueuedStompFrameHandler.java
new file mode 100644
index 00000000..7c682af0
--- /dev/null
+++ b/backend/src/test/java/org/luxons/sevenwonders/test/client/QueuedStompFrameHandler.java
@@ -0,0 +1,32 @@
+package org.luxons.sevenwonders.test.client;
+
+import java.lang.reflect.Type;
+import java.util.concurrent.BlockingQueue;
+
+import org.springframework.messaging.simp.stomp.StompFrameHandler;
+import org.springframework.messaging.simp.stomp.StompHeaders;
+
+class QueuedStompFrameHandler<T> implements StompFrameHandler {
+
+ private final BlockingQueue<T> blockingQueue;
+
+ private final Class<T> type;
+
+ QueuedStompFrameHandler(BlockingQueue<T> blockingQueue, Class<T> type) {
+ this.blockingQueue = blockingQueue;
+ this.type = type;
+ }
+
+ @Override
+ public Type getPayloadType(StompHeaders stompHeaders) {
+ return type;
+ }
+
+ @Override
+ public void handleFrame(StompHeaders stompHeaders, Object o) {
+ if (o == null) {
+ throw new IllegalArgumentException("Unsupported null payloads in this type of frame handler");
+ }
+ blockingQueue.offer(type.cast(o));
+ }
+}
bgstack15