diff --git a/subscription-example/.gitignore b/subscription-example/.gitignore new file mode 100644 index 0000000..a9c4472 --- /dev/null +++ b/subscription-example/.gitignore @@ -0,0 +1,23 @@ +.gradle +/build/ +!gradle/wrapper/gradle-wrapper.jar + ### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + ### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr +/out/ + ### NetBeans ### +/nbproject/private/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ \ No newline at end of file diff --git a/subscription-example/build.gradle b/subscription-example/build.gradle new file mode 100644 index 0000000..c8d9ef7 --- /dev/null +++ b/subscription-example/build.gradle @@ -0,0 +1,37 @@ +buildscript { + ext { + springBootVersion = '2.0.5.RELEASE' + } + repositories { + mavenCentral() + } + dependencies { + classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") + } +} + +apply plugin: 'java' +apply plugin: 'eclipse' +apply plugin: 'org.springframework.boot' +apply plugin: 'io.spring.dependency-management' + +group = 'com.graphql-java' +version = '0.0.1-SNAPSHOT' +sourceCompatibility = 1.8 + +repositories { + mavenCentral() + maven { url "http://dl.bintray.com/andimarek/graphql-java" } +} + + +dependencies { + compile('org.springframework.boot:spring-boot-starter-web') + compile('org.springframework.boot:spring-boot-starter-websocket') + + compile "com.graphql-java:graphql-java:10.0" + compile "io.reactivex.rxjava2:rxjava:2.1.5" + compile "com.google.code.gson:gson:2.8.0" + + testCompile('org.springframework.boot:spring-boot-starter-test') +} diff --git a/subscription-example/gradle/wrapper/gradle-wrapper.jar b/subscription-example/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000..1ce6e58 Binary files /dev/null and b/subscription-example/gradle/wrapper/gradle-wrapper.jar differ diff --git a/subscription-example/gradle/wrapper/gradle-wrapper.properties b/subscription-example/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000..d834965 --- /dev/null +++ b/subscription-example/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,6 @@ +#Sat Sep 15 15:12:07 AEST 2018 +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-4.8.1-all.zip diff --git a/subscription-example/gradlew b/subscription-example/gradlew new file mode 100755 index 0000000..4453cce --- /dev/null +++ b/subscription-example/gradlew @@ -0,0 +1,172 @@ +#!/usr/bin/env sh + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn ( ) { + echo "$*" +} + +die ( ) { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save ( ) { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/subscription-example/gradlew.bat b/subscription-example/gradlew.bat new file mode 100644 index 0000000..f955316 --- /dev/null +++ b/subscription-example/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/subscription-example/readme.md b/subscription-example/readme.md new file mode 100644 index 0000000..f6061fe --- /dev/null +++ b/subscription-example/readme.md @@ -0,0 +1,128 @@ +# graphql-java Subscriptions over WebSockets example + +An example of using graphql subscriptions via websockets, graphql-java, reactive-streams and RxJava. + +To build the example code in this repository type: + + ./gradlew build + +To run the example code type: + + ./gradlew bootRun + +To access the example application, point your browser at: + + http://localhost:8080/ + +# Code Explanation + +This example shows how you can use graphql-java subscription support to "subscribe" to a publisher of events. +Then as events occur, graphql-java will map the original graphql query over those same event objects and send out +a stream of `ExecutionResult` objects. + +In this example application we have a stock update type system defined as: + + type Subscription { + stockQuotes(stockCodes:[String]) : StockPriceUpdate! + } + + type StockPriceUpdate { + dateTime : String + stockCode : String + stockPrice : Float + stockPriceChange : Float + } + +The JavaScript client sends a subscription graphql query over websockets to the server: + + var query = 'subscription StockCodeSubscription { \n' + + ' stockQuotes {' + + ' dateTime\n' + + ' stockCode\n' + + ' stockPrice\n' + + ' stockPriceChange\n' + + ' }' + + '}'; + var graphqlMsg = { + query: query, + variables: {} + }; + exampleSocket.send(JSON.stringify(graphqlMsg)); + +The server executes this with the graphql-java engine: + + GraphQL graphQL = GraphQL + .newGraphQL(graphqlPublisher.getGraphQLSchema()) + .build(); + + ExecutionResult executionResult = graphQL.execute(executionInput); + +The result of that initial subscription query is a http://www.reactive-streams.org/ `Publisher` + + Publisher stockPriceStream = executionResult.getData(); + +Under the covers a RxJava 2.x implementation is used to provide a stream of synthesized stock events. + +RxJava Flows are an implementation of the reactive streams Publisher interface. You can use ANY reactive streams +implementation as a source. graphql-java uses the reactive streams interfaces as a common interface. + +See https://github.com/ReactiveX/RxJava for more information on RxJava. + +The server side code then subscribes to this publisher of events and sends the results back over the websocket +to the waiting browser client: + + stockPriceStream.subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + subscriptionRef.set(s); + request(1); + } + + @Override + public void onNext(ExecutionResult er) { + log.debug("Sending stick price update"); + try { + Object stockPriceUpdate = er.getData(); + String json = JsonKit.toJsonString(stockPriceUpdate); + webSocketSession.sendMessage(new TextMessage(json)); + } catch (IOException e) { + e.printStackTrace(); + } + request(1); + } + + @Override + public void onError(Throwable t) { + log.error("Subscription threw an exception", t); + try { + webSocketSession.close(); + } catch (IOException e) { + log.error("Unable to close websocket session", e); + } + } + + @Override + public void onComplete() { + log.info("Subscription complete"); + try { + webSocketSession.close(); + } catch (IOException e) { + log.error("Unable to close websocket session", e); + } + } + }); + +The selection set of fields named in the original query will be applied to each underlying stock update object. + +The selection set in this example application is selected as follows: + + stockQuotes { + dateTime + stockCode + stockPrice + stockPriceChange + } + +The underling stock update object is mapped to this selection of fields, just like any normal graphql query. The format +of the results on the browser is JSON, again like any other normal graphql query. diff --git a/subscription-example/settings.gradle b/subscription-example/settings.gradle new file mode 100644 index 0000000..fb7cc46 --- /dev/null +++ b/subscription-example/settings.gradle @@ -0,0 +1 @@ +rootProject.name = 'subscription-example' diff --git a/subscription-example/src/main/java/com/graphql/java/subscription/StockTickerGraphqlPublisher.java b/subscription-example/src/main/java/com/graphql/java/subscription/StockTickerGraphqlPublisher.java new file mode 100644 index 0000000..8d79001 --- /dev/null +++ b/subscription-example/src/main/java/com/graphql/java/subscription/StockTickerGraphqlPublisher.java @@ -0,0 +1,68 @@ +package com.graphql.java.subscription; + +import com.graphql.java.subscription.data.StockTickerPublisher; +import graphql.schema.DataFetcher; +import graphql.schema.GraphQLSchema; +import graphql.schema.idl.RuntimeWiring; +import graphql.schema.idl.SchemaGenerator; +import graphql.schema.idl.SchemaParser; +import graphql.schema.idl.TypeDefinitionRegistry; +import org.springframework.stereotype.Component; + +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.util.Collections; +import java.util.List; + +import static graphql.schema.idl.TypeRuntimeWiring.newTypeWiring; + +@Component +public class StockTickerGraphqlPublisher { + private final static StockTickerPublisher STOCK_TICKER_PUBLISHER = new StockTickerPublisher(); + + private final GraphQLSchema graphQLSchema; + + public StockTickerGraphqlPublisher() { + graphQLSchema = buildSchema(); + } + + private GraphQLSchema buildSchema() { + // + // reads a file that provides the schema types + // + Reader streamReader = loadSchemaFile("stocks.graphqls"); + TypeDefinitionRegistry typeRegistry = new SchemaParser().parse(streamReader); + + RuntimeWiring wiring = RuntimeWiring.newRuntimeWiring() + .type(newTypeWiring("Subscription") + .dataFetcher("stockQuotes", stockQuotesSubscriptionFetcher()) + ) + .build(); + + return new SchemaGenerator().makeExecutableSchema(typeRegistry, wiring); + } + + private DataFetcher stockQuotesSubscriptionFetcher() { + return environment -> { + List arg = environment.getArgument("stockCodes"); + List stockCodesFilter = arg == null ? Collections.emptyList() : arg; + if (stockCodesFilter.isEmpty()) { + return STOCK_TICKER_PUBLISHER.getPublisher(); + } else { + return STOCK_TICKER_PUBLISHER.getPublisher(stockCodesFilter); + } + }; + } + + public GraphQLSchema getGraphQLSchema() { + return graphQLSchema; + } + + @SuppressWarnings("SameParameterValue") + private Reader loadSchemaFile(String name) { + InputStream stream = getClass().getClassLoader().getResourceAsStream(name); + return new InputStreamReader(stream); + } + +} diff --git a/subscription-example/src/main/java/com/graphql/java/subscription/StockTickerWebSocketHandler.java b/subscription-example/src/main/java/com/graphql/java/subscription/StockTickerWebSocketHandler.java new file mode 100644 index 0000000..eb8b905 --- /dev/null +++ b/subscription-example/src/main/java/com/graphql/java/subscription/StockTickerWebSocketHandler.java @@ -0,0 +1,132 @@ +package com.graphql.java.subscription; + +import com.graphql.java.subscription.utill.JsonKit; +import com.graphql.java.subscription.utill.QueryParameters; +import graphql.ExecutionInput; +import graphql.ExecutionResult; +import graphql.GraphQL; +import graphql.execution.instrumentation.ChainedInstrumentation; +import graphql.execution.instrumentation.Instrumentation; +import graphql.execution.instrumentation.tracing.TracingInstrumentation; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; +import org.springframework.web.socket.handler.TextWebSocketHandler; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; + +import static java.util.Collections.singletonList; + +public class StockTickerWebSocketHandler extends TextWebSocketHandler { + + private static final Logger log = LoggerFactory.getLogger(StockTickerWebSocketHandler.class); + + private final StockTickerGraphqlPublisher graphqlPublisher; + private final AtomicReference subscriptionRef; + + public StockTickerWebSocketHandler(StockTickerGraphqlPublisher graphqlPublisher) { + this.graphqlPublisher = graphqlPublisher; + subscriptionRef = new AtomicReference<>(); + } + + @Override + public void afterConnectionEstablished(WebSocketSession session) throws Exception { + log.info("Websocket connection established"); + } + + @Override + public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { + log.info("Closing subscription "); + Subscription subscription = subscriptionRef.get(); + if (subscription != null) { + subscription.cancel(); + } + } + + @Override + protected void handleTextMessage(WebSocketSession webSocketSession, TextMessage message) throws Exception { + String graphqlQuery = message.getPayload(); + log.info("Websocket said {}", graphqlQuery); + + QueryParameters parameters = QueryParameters.from(graphqlQuery); + + ExecutionInput executionInput = ExecutionInput.newExecutionInput() + .query(parameters.getQuery()) + .variables(parameters.getVariables()) + .operationName(parameters.getOperationName()) + .build(); + + Instrumentation instrumentation = new ChainedInstrumentation( + singletonList(new TracingInstrumentation()) + ); + + // + // In order to have subscriptions in graphql-java you MUST use the + // SubscriptionExecutionStrategy strategy. + // + GraphQL graphQL = GraphQL + .newGraphQL(graphqlPublisher.getGraphQLSchema()) + .instrumentation(instrumentation) + .build(); + + ExecutionResult executionResult = graphQL.execute(executionInput); + + Publisher stockPriceStream = executionResult.getData(); + + stockPriceStream.subscribe(new Subscriber() { + + @Override + public void onSubscribe(Subscription s) { + subscriptionRef.set(s); + request(1); + } + + @Override + public void onNext(ExecutionResult er) { + log.debug("Sending stick price update"); + try { + Object stockPriceUpdate = er.getData(); + String json = JsonKit.toJsonString(stockPriceUpdate); + webSocketSession.sendMessage(new TextMessage(json)); + } catch (IOException e) { + e.printStackTrace(); + } + request(1); + } + + @Override + public void onError(Throwable t) { + log.error("Subscription threw an exception", t); + try { + webSocketSession.close(); + } catch (IOException e) { + log.error("Unable to close websocket session", e); + } + } + + @Override + public void onComplete() { + log.info("Subscription complete"); + try { + webSocketSession.close(); + } catch (IOException e) { + log.error("Unable to close websocket session", e); + } + } + }); + } + + private void request(int n) { + Subscription subscription = subscriptionRef.get(); + if (subscription != null) { + subscription.request(n); + } + } + +} diff --git a/subscription-example/src/main/java/com/graphql/java/subscription/StocktickerWebSocketConfig.java b/subscription-example/src/main/java/com/graphql/java/subscription/StocktickerWebSocketConfig.java new file mode 100644 index 0000000..e136887 --- /dev/null +++ b/subscription-example/src/main/java/com/graphql/java/subscription/StocktickerWebSocketConfig.java @@ -0,0 +1,24 @@ +package com.graphql.java.subscription; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.socket.config.annotation.EnableWebSocket; +import org.springframework.web.socket.config.annotation.WebSocketConfigurer; +import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; + +@Configuration +@EnableWebSocket +public class StocktickerWebSocketConfig implements WebSocketConfigurer { + + private final StockTickerGraphqlPublisher stockTickerGraphqlPublisher; + + @Autowired + public StocktickerWebSocketConfig(StockTickerGraphqlPublisher stockTickerGraphqlPublisher) { + this.stockTickerGraphqlPublisher = stockTickerGraphqlPublisher; + } + + public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { + registry.addHandler(new StockTickerWebSocketHandler(stockTickerGraphqlPublisher), "/stockticker").setAllowedOrigins("*"); + } +} + diff --git a/subscription-example/src/main/java/com/graphql/java/subscription/SubscriptionExampleApplication.java b/subscription-example/src/main/java/com/graphql/java/subscription/SubscriptionExampleApplication.java new file mode 100644 index 0000000..6f4cdbb --- /dev/null +++ b/subscription-example/src/main/java/com/graphql/java/subscription/SubscriptionExampleApplication.java @@ -0,0 +1,12 @@ +package com.graphql.java.subscription; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SubscriptionExampleApplication { + + public static void main(String[] args) { + SpringApplication.run(SubscriptionExampleApplication.class, args); + } +} diff --git a/subscription-example/src/main/java/com/graphql/java/subscription/data/StockPriceUpdate.java b/subscription-example/src/main/java/com/graphql/java/subscription/data/StockPriceUpdate.java new file mode 100644 index 0000000..3eab8bf --- /dev/null +++ b/subscription-example/src/main/java/com/graphql/java/subscription/data/StockPriceUpdate.java @@ -0,0 +1,36 @@ +package com.graphql.java.subscription.data; + +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +public class StockPriceUpdate { + + private final String stockCode; + private final String dateTime; + private final BigDecimal stockPrice; + private final BigDecimal stockPriceChange; + + public StockPriceUpdate(String stockCode, LocalDateTime dateTime, BigDecimal stockPrice, BigDecimal stockPriceChange) { + this.stockCode = stockCode; + this.dateTime = dateTime.format(DateTimeFormatter.ISO_DATE_TIME); + this.stockPrice = stockPrice; + this.stockPriceChange = stockPriceChange; + } + + public String getStockCode() { + return stockCode; + } + + public String getDateTime() { + return dateTime; + } + + public BigDecimal getStockPrice() { + return stockPrice; + } + + public BigDecimal getStockPriceChange() { + return stockPriceChange; + } +} diff --git a/subscription-example/src/main/java/com/graphql/java/subscription/data/StockTickerPublisher.java b/subscription-example/src/main/java/com/graphql/java/subscription/data/StockTickerPublisher.java new file mode 100644 index 0000000..cdae5b9 --- /dev/null +++ b/subscription-example/src/main/java/com/graphql/java/subscription/data/StockTickerPublisher.java @@ -0,0 +1,120 @@ +package com.graphql.java.subscription.data; + +import io.reactivex.BackpressureStrategy; +import io.reactivex.Flowable; +import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; +import io.reactivex.observables.ConnectableObservable; + +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class StockTickerPublisher { + + private final Flowable publisher; + + public StockTickerPublisher() { + Observable stockPriceUpdateObservable = Observable.create(emitter -> { + + ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); + executorService.scheduleAtFixedRate(newStockTick(emitter), 0, 2, TimeUnit.SECONDS); + + }); + + ConnectableObservable connectableObservable = stockPriceUpdateObservable.share().publish(); + connectableObservable.connect(); + + publisher = connectableObservable.toFlowable(BackpressureStrategy.BUFFER); + } + + private Runnable newStockTick(ObservableEmitter emitter) { + return () -> { + List stockPriceUpdates = getUpdates(rollDice(0, 5)); + if (stockPriceUpdates != null) { + emitStocks(emitter, stockPriceUpdates); + } + }; + } + + private void emitStocks(ObservableEmitter emitter, List stockPriceUpdates) { + for (StockPriceUpdate stockPriceUpdate : stockPriceUpdates) { + try { + emitter.onNext(stockPriceUpdate); + } catch (RuntimeException rte) { + rte.printStackTrace(); + } + } + } + + public Flowable getPublisher() { + return publisher; + } + + public Flowable getPublisher(List stockCodes) { + return publisher.filter(stockPriceUpdate -> stockCodes.contains(stockPriceUpdate.getStockCode())); + } + + private List getUpdates(int number) { + List updates = new ArrayList<>(); + for (int i = 0; i < number; i++) { + updates.add(rollUpdate()); + } + return updates; + } + + + private final static Map CURRENT_STOCK_PRICES = new ConcurrentHashMap<>(); + + static { + CURRENT_STOCK_PRICES.put("TEAM", dollars(39, 64)); + CURRENT_STOCK_PRICES.put("IBM", dollars(147, 10)); + CURRENT_STOCK_PRICES.put("AMZN", dollars(1002, 94)); + CURRENT_STOCK_PRICES.put("MSFT", dollars(77, 49)); + CURRENT_STOCK_PRICES.put("GOOGL", dollars(1007, 87)); + } + + private StockPriceUpdate rollUpdate() { + ArrayList STOCK_CODES = new ArrayList<>(CURRENT_STOCK_PRICES.keySet()); + + String stockCode = STOCK_CODES.get(rollDice(0, STOCK_CODES.size() - 1)); + BigDecimal currentPrice = CURRENT_STOCK_PRICES.get(stockCode); + + + BigDecimal incrementDollars = dollars(rollDice(0, 1), rollDice(0, 99)); + if (rollDice(0, 10) > 7) { + // 0.3 of the time go down + incrementDollars = incrementDollars.negate(); + } + BigDecimal newPrice = currentPrice.add(incrementDollars); + + CURRENT_STOCK_PRICES.put(stockCode, newPrice); + return new StockPriceUpdate(stockCode, LocalDateTime.now(), newPrice, incrementDollars); + } + + private static BigDecimal dollars(int dollars, int cents) { + return truncate("" + dollars + "." + cents); + } + + private static BigDecimal truncate(final String text) { + BigDecimal bigDecimal = new BigDecimal(text); + if (bigDecimal.scale() > 2) + bigDecimal = new BigDecimal(text).setScale(2, RoundingMode.HALF_UP); + return bigDecimal.stripTrailingZeros(); + } + + private final static Random rand = new Random(); + + private static int rollDice(int min, int max) { + return rand.nextInt((max - min) + 1) + min; + } + +} diff --git a/subscription-example/src/main/java/com/graphql/java/subscription/utill/JsonKit.java b/subscription-example/src/main/java/com/graphql/java/subscription/utill/JsonKit.java new file mode 100644 index 0000000..965cdd3 --- /dev/null +++ b/subscription-example/src/main/java/com/graphql/java/subscription/utill/JsonKit.java @@ -0,0 +1,37 @@ +package com.graphql.java.subscription.utill; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; + +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +/** + * This example code chose to use GSON as its JSON parser. Any JSON parser should be fine + */ +public class JsonKit { + private static final Gson GSON = new GsonBuilder() + // + // This is important because the graphql spec says that null values should be present + // + .serializeNulls() + .create(); + + public static Map toMap(String jsonStr) { + if (jsonStr == null || jsonStr.trim().length() == 0) { + return Collections.emptyMap(); + } + // gson uses type tokens for generic input like Map + TypeToken> typeToken = new TypeToken>() { + }; + Map map = GSON.fromJson(jsonStr, typeToken.getType()); + return map == null ? Collections.emptyMap() : map; + } + + public static String toJsonString(Object obj) { + return GSON.toJson(obj); + } +} diff --git a/subscription-example/src/main/java/com/graphql/java/subscription/utill/QueryParameters.java b/subscription-example/src/main/java/com/graphql/java/subscription/utill/QueryParameters.java new file mode 100644 index 0000000..f563686 --- /dev/null +++ b/subscription-example/src/main/java/com/graphql/java/subscription/utill/QueryParameters.java @@ -0,0 +1,58 @@ +package com.graphql.java.subscription.utill; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * Graphql clients can send GET or POST HTTP requests. The spec does not make an explicit + * distinction. So you may need to handle both. The following was tested using + * a graphiql client tool found here : https://github.com/skevy/graphiql-app + * + * You should consider bundling graphiql in your application + * + * https://github.com/graphql/graphiql + * + * This outlines more information on how to handle parameters over http + * + * http://graphql.org/learn/serving-over-http/ + */ +public class QueryParameters { + + private String query; + private String operationName; + private Map variables = Collections.emptyMap(); + + public String getQuery() { + return query; + } + + public String getOperationName() { + return operationName; + } + + public Map getVariables() { + return variables; + } + + public static QueryParameters from(String queryMessage) { + QueryParameters parameters = new QueryParameters(); + Map json = JsonKit.toMap(queryMessage); + parameters.query = (String) json.get("query"); + parameters.operationName = (String) json.get("operationName"); + parameters.variables = getVariables(json.get("variables")); + return parameters; + } + + + private static Map getVariables(Object variables) { + if (variables instanceof Map) { + Map inputVars = (Map) variables; + Map vars = new HashMap<>(); + inputVars.forEach((k, v) -> vars.put(String.valueOf(k), v)); + return vars; + } + return JsonKit.toMap(String.valueOf(variables)); + } + +} diff --git a/subscription-example/src/main/resources/application.properties b/subscription-example/src/main/resources/application.properties new file mode 100644 index 0000000..e69de29 diff --git a/subscription-example/src/main/resources/static/index.html b/subscription-example/src/main/resources/static/index.html new file mode 100644 index 0000000..9e304fc --- /dev/null +++ b/subscription-example/src/main/resources/static/index.html @@ -0,0 +1,173 @@ + + + + + Subscriptions over Web Sockets + + + + + + + + + + + +
+

graphql-java Subscriptions

+

An example of graphql-java subscriptions sending the continous updates over websockets

+
+ + +
+
+
+ graphql-java +
+
+

Explanation

+

This demonstrates the use of graphql subscriptions and web sockets to send a stream of imagined stock price + updates to this page.

+

The updates are continuously sent from a server side publish and subscribe system (RxJava in this case) and + pushed + down to the browser client while applying graphql shapes to the subscription data

+

The graphql query used in this example is :

+
+subscription StockCodeSubscription {
+    stockQuotes {
+        dateTime
+        stockCode
+        stockPrice
+        stockPriceChange
+    }
+}
+            
+
+
+

Stock Price Updates

+
Pending subscription...
+
📡
+
+
+
+ + \ No newline at end of file diff --git a/subscription-example/src/main/resources/stocks.graphqls b/subscription-example/src/main/resources/stocks.graphqls new file mode 100644 index 0000000..24b994a --- /dev/null +++ b/subscription-example/src/main/resources/stocks.graphqls @@ -0,0 +1,22 @@ +# +# Schemas must have at least a query root type +# +schema { + query: Query + subscription : Subscription +} + +type Query { + helo : String +} + +type Subscription { + stockQuotes(stockCodes:[String]) : StockPriceUpdate! +} + +type StockPriceUpdate { + dateTime : String + stockCode : String + stockPrice : Float + stockPriceChange : Float +}