diff options
author | jbion <joffrey.bion@amadeus.com> | 2017-05-25 03:22:15 +0200 |
---|---|---|
committer | jbion <joffrey.bion@amadeus.com> | 2017-05-25 03:22:15 +0200 |
commit | cfd455c04ebe374607308e4ec27668f22d78f5f7 (patch) | |
tree | 1746756933fbf24137ced4f8550df5e6c2a1ad95 /backend | |
parent | Add flow (diff) | |
download | seven-wonders-cfd455c04ebe374607308e4ec27668f22d78f5f7.tar.gz seven-wonders-cfd455c04ebe374607308e4ec27668f22d78f5f7.tar.bz2 seven-wonders-cfd455c04ebe374607308e4ec27668f22d78f5f7.zip |
Add a simple JSON+STOMP client (to be extracted as lib)
Diffstat (limited to 'backend')
7 files changed, 294 insertions, 44 deletions
diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/TestStompFrameHandler.java b/backend/src/test/java/org/luxons/sevenwonders/test/TestStompFrameHandler.java deleted file mode 100644 index 3f253a06..00000000 --- a/backend/src/test/java/org/luxons/sevenwonders/test/TestStompFrameHandler.java +++ /dev/null @@ -1,44 +0,0 @@ -package org.luxons.sevenwonders.test; - -import java.lang.reflect.Type; -import java.util.concurrent.BlockingQueue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.messaging.simp.stomp.StompFrameHandler; -import org.springframework.messaging.simp.stomp.StompHeaders; - -public class TestStompFrameHandler<T> implements StompFrameHandler { - - private static final Logger logger = LoggerFactory.getLogger(TestStompFrameHandler.class); - - private final BlockingQueue<T> blockingQueue; - - private final Class<T> type; - - public static TestStompFrameHandler<String> defaultHandler(BlockingQueue<String> blockingQueue) { - return new TestStompFrameHandler<>(blockingQueue, String.class); - } - - public TestStompFrameHandler(BlockingQueue<T> blockingQueue, Class<T> type) { - this.blockingQueue = blockingQueue; - this.type = type; - } - - @Override - public Type getPayloadType(StompHeaders stompHeaders) { - logger.info("Handler.getPayloadType, headers = " + stompHeaders); -// throw new RuntimeException("TEST EXCEPTION"); - return type; - } - - @Override - public void handleFrame(StompHeaders stompHeaders, Object o) { - blockingQueue.offer(type.cast(o)); -// try { -// blockingQueue.offer(new ObjectMapper().readValue((String) o, type)); -// } catch (IOException e) { -// throw new RuntimeException("Could not convert frame", e); -// } - } -} diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/client/Channel.java b/backend/src/test/java/org/luxons/sevenwonders/test/client/Channel.java new file mode 100644 index 00000000..6d3f1f41 --- /dev/null +++ b/backend/src/test/java/org/luxons/sevenwonders/test/client/Channel.java @@ -0,0 +1,32 @@ +package org.luxons.sevenwonders.test.client; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.springframework.messaging.simp.stomp.StompSession.Subscription; + +public class Channel<T> { + + private static final int DEFAULT_RECEPTION_TIMEOUT_IN_SECONDS = 10; + + private final BlockingQueue<T> messageQueue; + + private final Subscription subscription; + + Channel(Subscription subscription, BlockingQueue<T> messageQueue) { + this.subscription = subscription; + this.messageQueue = messageQueue; + } + + public T next() throws InterruptedException { + return messageQueue.poll(DEFAULT_RECEPTION_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); + } + + public T next(long timeout, TimeUnit unit) throws InterruptedException { + return messageQueue.poll(timeout, unit); + } + + public void unsubscribe() { + subscription.unsubscribe(); + } +} diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/client/EmptyMsgStompFrameHandler.java b/backend/src/test/java/org/luxons/sevenwonders/test/client/EmptyMsgStompFrameHandler.java new file mode 100644 index 00000000..bd04daea --- /dev/null +++ b/backend/src/test/java/org/luxons/sevenwonders/test/client/EmptyMsgStompFrameHandler.java @@ -0,0 +1,29 @@ +package org.luxons.sevenwonders.test.client; + +import java.lang.reflect.Type; +import java.util.concurrent.BlockingQueue; + +import org.springframework.messaging.simp.stomp.StompFrameHandler; +import org.springframework.messaging.simp.stomp.StompHeaders; + +class EmptyMsgStompFrameHandler implements StompFrameHandler { + + private final BlockingQueue<Object> blockingQueue; + + EmptyMsgStompFrameHandler(BlockingQueue<Object> blockingQueue) { + this.blockingQueue = blockingQueue; + } + + @Override + public Type getPayloadType(StompHeaders stompHeaders) { + return Object.class; + } + + @Override + public void handleFrame(StompHeaders stompHeaders, Object o) { + if (o != null) { + throw new IllegalArgumentException("Non-null payload in EmptyMsgStompFrameHandler"); + } + blockingQueue.offer(new Object()); + } +} diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompClient.java b/backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompClient.java new file mode 100644 index 00000000..1e9a5c3e --- /dev/null +++ b/backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompClient.java @@ -0,0 +1,68 @@ +package org.luxons.sevenwonders.test.client; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.springframework.messaging.converter.MappingJackson2MessageConverter; +import org.springframework.messaging.simp.stomp.StompSession; +import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.web.socket.client.WebSocketClient; +import org.springframework.web.socket.client.standard.StandardWebSocketClient; +import org.springframework.web.socket.messaging.WebSocketStompClient; +import org.springframework.web.socket.sockjs.client.SockJsClient; +import org.springframework.web.socket.sockjs.client.Transport; +import org.springframework.web.socket.sockjs.client.WebSocketTransport; + +public class JsonStompClient { + + private static final long DEFAULT_CONNECTION_TIMEOUT_IN_SECONDS = 15; + + private final WebSocketStompClient client; + + public JsonStompClient() { + this(createDefaultStompClient()); + } + + public JsonStompClient(WebSocketStompClient client) { + this.client = client; + } + + private static WebSocketStompClient createDefaultStompClient() { + WebSocketStompClient stompClient = new WebSocketStompClient(createWebSocketClient()); + stompClient.setMessageConverter(new MappingJackson2MessageConverter()); // for custom object exchanges + stompClient.setTaskScheduler(createTaskScheduler()); // for heartbeats + return stompClient; + } + + private static WebSocketClient createWebSocketClient() { + return new SockJsClient(createWsTransports()); + } + + private static List<Transport> createWsTransports() { + return Collections.singletonList(new WebSocketTransport(new StandardWebSocketClient())); + } + + private static ThreadPoolTaskScheduler createTaskScheduler() { + ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); + taskScheduler.afterPropertiesSet(); + return taskScheduler; + } + + public JsonStompSession connect(String url) throws InterruptedException, ExecutionException, TimeoutException { + return connect(url, DEFAULT_CONNECTION_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS); + } + + public JsonStompSession connect(String url, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + StompSession session = client.connect(url, new LoggingStompSessionHandler()).get(timeout, unit); + session.setAutoReceipt(true); + return new JsonStompSession(session); + } + + public void stop() { + client.stop(); + } +} diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompSession.java b/backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompSession.java new file mode 100644 index 00000000..d2ded856 --- /dev/null +++ b/backend/src/test/java/org/luxons/sevenwonders/test/client/JsonStompSession.java @@ -0,0 +1,93 @@ +package org.luxons.sevenwonders.test.client; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +import org.springframework.messaging.simp.stomp.StompFrameHandler; +import org.springframework.messaging.simp.stomp.StompHeaders; +import org.springframework.messaging.simp.stomp.StompSession; + +public class JsonStompSession implements StompSession { + + private final StompSession stompSession; + + public JsonStompSession(StompSession stompSession) { + this.stompSession = stompSession; + } + + @Override + public Receiptable send(String destination, Object payload) { + return stompSession.send(destination, payload); + } + + @Override + public Receiptable send(StompHeaders headers, Object payload) { + return stompSession.send(headers, payload); + } + + @Override + public Subscription subscribe(String destination, StompFrameHandler handler) { + return stompSession.subscribe(destination, handler); + } + + @Override + public Subscription subscribe(StompHeaders headers, StompFrameHandler handler) { + return stompSession.subscribe(headers, handler); + } + + public <T> Channel<T> subscribe(String destination, Class<T> payloadType) { + BlockingQueue<T> blockingQueue = new LinkedBlockingDeque<>(); + StompFrameHandler frameHandler = new QueuedStompFrameHandler<>(blockingQueue, payloadType); + Subscription sub = stompSession.subscribe(destination, frameHandler); + return new Channel<>(sub, blockingQueue); + } + + public Channel<Object> subscribeEmptyMsgs(String destination) { + BlockingQueue<Object> blockingQueue = new LinkedBlockingDeque<>(); + StompFrameHandler frameHandler = new EmptyMsgStompFrameHandler(blockingQueue); + Subscription sub = stompSession.subscribe(destination, frameHandler); + return new Channel<>(sub, blockingQueue); + } + @Override + public Receiptable acknowledge(String messageId, boolean consumed) { + return stompSession.acknowledge(messageId, consumed); + } + + @Override + public String getSessionId() { + return stompSession.getSessionId(); + } + + @Override + public boolean isConnected() { + return stompSession.isConnected(); + } + + @Override + public void setAutoReceipt(boolean enabled) { + stompSession.setAutoReceipt(enabled); + } + + @Override + public void disconnect() { + stompSession.disconnect(); + } + + public <T> T request(String requestDestination, Object payload, String responseDestination, Class<T> responseType) + throws InterruptedException { + Channel<T> channel = subscribe(responseDestination, responseType); + send(requestDestination, payload); + T msg = channel.next(); + channel.unsubscribe(); + return msg; + } + + public boolean request(String requestDestination, Object payload, String responseDestination) + throws InterruptedException { + Channel<Object> channel = subscribeEmptyMsgs(responseDestination); + send(requestDestination, payload); + Object msg = channel.next(); + channel.unsubscribe(); + return msg != null; + } +} diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/client/LoggingStompSessionHandler.java b/backend/src/test/java/org/luxons/sevenwonders/test/client/LoggingStompSessionHandler.java new file mode 100644 index 00000000..abf223fd --- /dev/null +++ b/backend/src/test/java/org/luxons/sevenwonders/test/client/LoggingStompSessionHandler.java @@ -0,0 +1,40 @@ +package org.luxons.sevenwonders.test.client; + +import java.lang.reflect.Type; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.messaging.simp.stomp.StompCommand; +import org.springframework.messaging.simp.stomp.StompHeaders; +import org.springframework.messaging.simp.stomp.StompSession; +import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter; + +class LoggingStompSessionHandler extends StompSessionHandlerAdapter { + + private static final Logger logger = LoggerFactory.getLogger(LoggingStompSessionHandler.class); + + @Override + public void afterConnected(StompSession session, StompHeaders connectedHeaders) { + logger.info("Client connected under session id " + session.getSessionId()); + } + + @Override + public void handleFrame(StompHeaders headers, Object payload) { + } + + @Override + public Type getPayloadType(StompHeaders headers) { + return String.class; + } + + @Override + public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, + Throwable exception) { + logger.error("Exception thrown in session " + session.getSessionId(), exception); + } + + @Override + public void handleTransportError(StompSession session, Throwable exception) { + logger.error("Transport exception thrown in session " + session.getSessionId(), exception); + } +} diff --git a/backend/src/test/java/org/luxons/sevenwonders/test/client/QueuedStompFrameHandler.java b/backend/src/test/java/org/luxons/sevenwonders/test/client/QueuedStompFrameHandler.java new file mode 100644 index 00000000..7c682af0 --- /dev/null +++ b/backend/src/test/java/org/luxons/sevenwonders/test/client/QueuedStompFrameHandler.java @@ -0,0 +1,32 @@ +package org.luxons.sevenwonders.test.client; + +import java.lang.reflect.Type; +import java.util.concurrent.BlockingQueue; + +import org.springframework.messaging.simp.stomp.StompFrameHandler; +import org.springframework.messaging.simp.stomp.StompHeaders; + +class QueuedStompFrameHandler<T> implements StompFrameHandler { + + private final BlockingQueue<T> blockingQueue; + + private final Class<T> type; + + QueuedStompFrameHandler(BlockingQueue<T> blockingQueue, Class<T> type) { + this.blockingQueue = blockingQueue; + this.type = type; + } + + @Override + public Type getPayloadType(StompHeaders stompHeaders) { + return type; + } + + @Override + public void handleFrame(StompHeaders stompHeaders, Object o) { + if (o == null) { + throw new IllegalArgumentException("Unsupported null payloads in this type of frame handler"); + } + blockingQueue.offer(type.cast(o)); + } +} |