Skip to content

Commit

Permalink
feat(server): generic GraphQL WS subscription support (#1810)
Browse files Browse the repository at this point in the history
Add [GraphQL WS subscription
protocol](https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md)
implementation to the `graphql-kotlin-server`. Both Ktor and Spring
servers can now use the same `Flow` based implementation.

Spring server defaults to the new `graphql-transport-ws` protocol. In
order to opt-in to use the legacy Apollo `subscription-transport-ws` you
need to explicitly specify
`graphql.subscriptions.protocol=APOLLO_SUBSCRIPTIONS_WS ` property.

Deprecated all existing Spring classes related to [Apollo
subscription-transport-ws](https://github.com/apollographql/subscriptions-transport-ws)
protocol.
  • Loading branch information
dariuszkuc authored Jul 7, 2023
1 parent 707731a commit 74ddab7
Show file tree
Hide file tree
Showing 58 changed files with 1,931 additions and 1,332 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ tasks {
jacoco {
toolVersion = libs.versions.jacoco.get()
}
jacocoTestCoverageVerification {
finalizedBy(jacocoTestReport)
}
jacocoTestReport {
dependsOn(test)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
graphql:
packages:
- "com.expediagroup.graphql.examples.server.spring"
playground:
enabled: true
subscriptions:
# Send a ka message every 1000 ms (1 second)
keepAliveInterval: 1000
protocol: "APOLLO_SUBSCRIPTIONS_WS"
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
package com.expediagroup.graphql.examples.server.spring.subscriptions

import com.expediagroup.graphql.examples.server.spring.SUBSCRIPTION_ENDPOINT
import com.expediagroup.graphql.server.spring.subscriptions.SubscriptionOperationMessage
import com.expediagroup.graphql.server.spring.subscriptions.SubscriptionOperationMessage.ClientMessages.GQL_CONNECTION_INIT
import com.expediagroup.graphql.server.spring.subscriptions.SubscriptionOperationMessage.ClientMessages.GQL_START
import com.expediagroup.graphql.server.spring.subscriptions.SubscriptionOperationMessage.ServerMessages
import com.expediagroup.graphql.server.spring.subscriptions.ApolloSubscriptionOperationMessage
import com.expediagroup.graphql.server.types.GraphQLRequest
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
Expand Down Expand Up @@ -168,23 +165,25 @@ class SimpleSubscriptionIT(@LocalServerPort private var port: Int) {
.then(session.send(Flux.just(session.textMessage(startMessage))))
.thenMany(
session.receive()
.map { objectMapper.readValue<SubscriptionOperationMessage>(it.payloadAsText) }
.map { objectMapper.readValue<ApolloSubscriptionOperationMessage>(it.payloadAsText) }
.doOnNext {
if (it.type == ServerMessages.GQL_DATA.type) {
if (it.type == ApolloSubscriptionOperationMessage.ServerMessages.GQL_DATA.type) {
val data = objectMapper.writeValueAsString(it.payload)
output.next(data)
} else if (it.type == ServerMessages.GQL_COMPLETE.type) {
} else if (it.type == ApolloSubscriptionOperationMessage.ServerMessages.GQL_COMPLETE.type) {
output.complete()
}
}
)
.then()
}

private fun SubscriptionOperationMessage.toJson() = objectMapper.writeValueAsString(this)
private fun getInitMessage(id: String, payload: Any?) = SubscriptionOperationMessage(GQL_CONNECTION_INIT.type, id = id, payload = payload).toJson()
private fun ApolloSubscriptionOperationMessage.toJson() = objectMapper.writeValueAsString(this)
private fun getInitMessage(id: String, payload: Any?) = ApolloSubscriptionOperationMessage(
ApolloSubscriptionOperationMessage.ClientMessages.GQL_CONNECTION_INIT.type, id = id, payload = payload
).toJson()
private fun getStartMessage(query: String, id: String): String {
val request = GraphQLRequest("subscription { $query }")
return SubscriptionOperationMessage(GQL_START.type, id = id, payload = request).toJson()
return ApolloSubscriptionOperationMessage(ApolloSubscriptionOperationMessage.ClientMessages.GQL_START.type, id = id, payload = request).toJson()
}
}
4 changes: 2 additions & 2 deletions servers/graphql-kotlin-ktor-server/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ tasks {
limit {
counter = "INSTRUCTION"
value = "COVEREDRATIO"
minimum = "0.60".toBigDecimal()
minimum = "0.70".toBigDecimal()
}
limit {
counter = "BRANCH"
value = "COVEREDRATIO"
minimum = "0.35".toBigDecimal()
minimum = "0.45".toBigDecimal()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,8 @@ import com.expediagroup.graphql.generator.federation.FederatedSchemaGeneratorHoo
import com.expediagroup.graphql.generator.federation.FederatedSimpleTypeResolver
import com.expediagroup.graphql.generator.federation.toFederatedSchema
import com.expediagroup.graphql.generator.internal.state.ClassScanner
import com.expediagroup.graphql.server.execution.DefaultGraphQLSubscriptionExecutor
import com.expediagroup.graphql.server.execution.GraphQLRequestHandler
import com.expediagroup.graphql.server.ktor.subscriptions.KtorGraphQLSubscriptionHandler
import com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionHooks
import com.expediagroup.graphql.server.ktor.subscriptions.graphqlws.KtorGraphQLWebSocketProtocolHandler
import com.expediagroup.graphql.server.ktor.subscriptions.KtorGraphQLWebSocketServer
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import graphql.execution.AsyncExecutionStrategy
import graphql.execution.AsyncSerialExecutionStrategy
Expand Down Expand Up @@ -156,23 +153,25 @@ class GraphQL(config: GraphQLConfiguration) {
.build()

// TODO cannot override the request handler/server as it requires access to graphql engine
private val requestHandler: GraphQLRequestHandler = GraphQLRequestHandler(
graphQL = engine,
dataLoaderRegistryFactory = config.engine.dataLoaderRegistryFactory
)

val server: KtorGraphQLServer = KtorGraphQLServer(
requestParser = config.server.requestParser,
contextFactory = config.server.contextFactory,
requestHandler = GraphQLRequestHandler(
graphQL = engine,
dataLoaderRegistryFactory = config.engine.dataLoaderRegistryFactory
)
requestHandler = requestHandler
)

val subscriptionsHandler: KtorGraphQLSubscriptionHandler by lazy {
KtorGraphQLWebSocketProtocolHandler(
subscriptionExecutor = DefaultGraphQLSubscriptionExecutor(
graphQL = engine,
dataLoaderRegistryFactory = config.engine.dataLoaderRegistryFactory,
),
objectMapper = jacksonObjectMapper().apply(config.server.jacksonConfiguration),
subscriptionHooks = DefaultKtorGraphQLSubscriptionHooks(),
val subscriptionServer: KtorGraphQLWebSocketServer by lazy {
KtorGraphQLWebSocketServer(
requestParser = config.server.subscriptions.requestParser,
contextFactory = config.server.subscriptions.contextFactory,
subscriptionHooks = config.server.subscriptions.hooks,
requestHandler = requestHandler,
initTimeoutMillis = config.server.subscriptions.connectionInitTimeout,
objectMapper = jacksonObjectMapper().apply(config.server.jacksonConfiguration)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ import com.expediagroup.graphql.generator.hooks.FlowSubscriptionSchemaGeneratorH
import com.expediagroup.graphql.generator.hooks.SchemaGeneratorHooks
import com.expediagroup.graphql.generator.scalars.IDValueUnboxer
import com.expediagroup.graphql.server.Schema
import com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionContextFactory
import com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionHooks
import com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionRequestParser
import com.expediagroup.graphql.server.ktor.subscriptions.KtorGraphQLSubscriptionContextFactory
import com.expediagroup.graphql.server.ktor.subscriptions.KtorGraphQLSubscriptionHooks
import com.expediagroup.graphql.server.ktor.subscriptions.KtorGraphQLSubscriptionRequestParser
import com.expediagroup.graphql.server.operations.Mutation
import com.expediagroup.graphql.server.operations.Query
import com.expediagroup.graphql.server.operations.Subscription
Expand Down Expand Up @@ -85,6 +91,12 @@ import kotlin.reflect.KClass
* contextFactory = DefaultKtorGraphQLContextFactory()
* jacksonConfiguration = { }
* requestParser = KtorGraphQLRequestParser(jacksonObjectMapper())
* subscriptions {
* requestParser = DefaultKtorGraphQLSubscriptionRequestParser()
* contextFactory = DefaultKtorGraphQLSubscriptionContextFactory()
* hooks = DefaultKtorGraphQLSubscriptionHooks()
* connectionInitTimeout = 60_000
* }
* }
* ```
*/
Expand Down Expand Up @@ -255,6 +267,23 @@ class GraphQLConfiguration(config: ApplicationConfig) {
var jacksonConfiguration: ObjectMapper.() -> Unit = {}
/** Custom request parser */
var requestParser: KtorGraphQLRequestParser = KtorGraphQLRequestParser(jacksonObjectMapper().apply(jacksonConfiguration))
/** GraphQL WS subscription configuration */
val subscriptions: KtorSubscriptionConfiguration = KtorSubscriptionConfiguration(config)
fun subscriptions(subscriptionConfig: KtorSubscriptionConfiguration.() -> Unit) {
subscriptions.apply(subscriptionConfig)
}
}

/** Configuration for configuring GraphQL Web Socket subscription server */
class KtorSubscriptionConfiguration(config: ApplicationConfig) {
/** Custom WebSocket subscription parser */
var requestParser: KtorGraphQLSubscriptionRequestParser = DefaultKtorGraphQLSubscriptionRequestParser()
/** Custom WebSocket subscription context factory */
var contextFactory: KtorGraphQLSubscriptionContextFactory = DefaultKtorGraphQLSubscriptionContextFactory()
/** Custom WebSocket subscription hooks */
var hooks: KtorGraphQLSubscriptionHooks = DefaultKtorGraphQLSubscriptionHooks()
/** Server timeout between establishing web socket connection and receiving connection-init message */
var connectionInitTimeout: Long = config.tryGetString("graphql.server.subscription.connectionInitTimeout")?.toLongOrNull() ?: 60_000
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.expediagroup.graphql.server.ktor

import com.expediagroup.graphql.generator.extensions.print
import com.expediagroup.graphql.server.ktor.subscriptions.KtorGraphQLSubscriptionHandler
import com.expediagroup.graphql.server.execution.subscription.GRAPHQL_WS_PROTOCOL
import com.fasterxml.jackson.databind.ObjectMapper
import io.ktor.http.ContentType
import io.ktor.serialization.jackson.jackson
Expand All @@ -30,7 +30,9 @@ import io.ktor.server.routing.Route
import io.ktor.server.routing.application
import io.ktor.server.routing.get
import io.ktor.server.routing.post
import io.ktor.server.websocket.application
import io.ktor.server.websocket.webSocket
import kotlinx.coroutines.flow.collect

/**
* Configures GraphQL GET route
Expand Down Expand Up @@ -76,20 +78,15 @@ fun Route.graphQLPostRoute(endpoint: String = "graphql", streamingResponse: Bool
* Configures GraphQL subscriptions route
*
* @param endpoint GraphQL server subscriptions endpoint, defaults to 'subscriptions'
* @param handlerOverride Alternative KtorGraphQLSubscriptionHandler to handle subscriptions logic
*/
fun Route.graphQLSubscriptionsRoute(
endpoint: String = "subscriptions",
protocol: String? = "graphql-transport-ws",
handlerOverride: KtorGraphQLSubscriptionHandler? = null,
endpoint: String = "subscriptions"
) {
val handler = handlerOverride ?: run {
val graphQLPlugin = this.application.plugin(GraphQL)
graphQLPlugin.subscriptionsHandler
}

webSocket(path = endpoint, protocol = protocol) {
handler.handle(this)
webSocket(path = endpoint, protocol = GRAPHQL_WS_PROTOCOL) {
this.application.plugin(GraphQL)
.subscriptionServer
.handleSubscription(this)
.collect()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@

package com.expediagroup.graphql.server.ktor.subscriptions

import com.expediagroup.graphql.server.execution.subscription.GraphQLSubscriptionContextFactory
import io.ktor.server.websocket.WebSocketServerSession

interface KtorGraphQLSubscriptionHandler {
suspend fun handle(session: WebSocketServerSession)
}
/**
* Ktor specific version of WebSocket subscription context factory.
*/
interface KtorGraphQLSubscriptionContextFactory : GraphQLSubscriptionContextFactory<WebSocketServerSession>

class DefaultKtorGraphQLSubscriptionContextFactory : KtorGraphQLSubscriptionContextFactory
Original file line number Diff line number Diff line change
Expand Up @@ -13,60 +13,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.server.ktor.subscriptions

import com.expediagroup.graphql.generator.extensions.toGraphQLContext
import com.expediagroup.graphql.server.types.GraphQLRequest
import graphql.GraphQLContext
import com.expediagroup.graphql.server.execution.subscription.GraphQLSubscriptionHooks
import io.ktor.server.websocket.WebSocketServerSession

/**
* GraphQL subscription lifecycle hooks.
* Allows API user to add custom callbacks on subscription events, e.g. to add validation, context tracking etc.
*
* Inspired by Apollo Subscription Server Lifecycle Events.
* https://www.apollographql.com/docs/graphql-subscriptions/lifecycle-events/
* Ktor specific version of GraphQL WebSocket subscription hooks.
*/
interface KtorGraphQLSubscriptionHooks {
/**
* Allows validation of connectionParams prior to starting the connection.
* You can reject the connection by throwing an exception.
*/
fun onConnect(
connectionParams: Any?,
session: WebSocketServerSession,
): GraphQLContext = emptyMap<Any, Any>().toGraphQLContext()

/**
* Called when the client executes a GraphQL operation.
* The context here is what returned from [onConnect] earlier.
*/
fun onOperation(
operationId: String,
payload: GraphQLRequest,
session: WebSocketServerSession,
graphQLContext: GraphQLContext,
): Unit = Unit

/**
* Called when client unsubscribes
*/
fun onOperationComplete(
operationId: String,
session: WebSocketServerSession,
graphQLContext: GraphQLContext,
): Unit = Unit
interface KtorGraphQLSubscriptionHooks : GraphQLSubscriptionHooks<WebSocketServerSession>

/**
* Called when the client disconnects
*/
fun onDisconnect(
session: WebSocketServerSession,
graphQLContext: GraphQLContext
): Unit = Unit
}

/**
* Default implementation of lifecycle event hooks (No-op).
*/
open class DefaultKtorGraphQLSubscriptionHooks : KtorGraphQLSubscriptionHooks
class DefaultKtorGraphQLSubscriptionHooks : KtorGraphQLSubscriptionHooks
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright 2023 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.server.ktor.subscriptions

import com.expediagroup.graphql.server.execution.subscription.GraphQLSubscriptionRequestParser
import com.expediagroup.graphql.server.types.GraphQLSubscriptionStatus
import io.ktor.server.websocket.WebSocketServerSession
import io.ktor.websocket.CloseReason
import io.ktor.websocket.Frame
import io.ktor.websocket.close
import io.ktor.websocket.readText
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import org.slf4j.Logger
import org.slf4j.LoggerFactory

/**
* Ktor specific version of WebSocket subscription request parser.
*/
interface KtorGraphQLSubscriptionRequestParser : GraphQLSubscriptionRequestParser<WebSocketServerSession>

class DefaultKtorGraphQLSubscriptionRequestParser : KtorGraphQLSubscriptionRequestParser {

private val logger: Logger = LoggerFactory.getLogger(DefaultKtorGraphQLSubscriptionRequestParser::class.java)

override suspend fun parseRequestFlow(session: WebSocketServerSession): Flow<String> =
flow {
try {
while (session.isActive) {
val frame = session.incoming.receive()
if (frame !is Frame.Text) {
val invalidStatus = GraphQLSubscriptionStatus.INVALID_MESSAGE
session.close(CloseReason(code = invalidStatus.code.toShort(), message = invalidStatus.reason))
continue
} else {
val messageString = frame.readText()
emit(messageString)
}
}
} catch (e: ClosedReceiveChannelException) {
logger.debug("Client disconnected")
}
}
}
Loading

0 comments on commit 74ddab7

Please sign in to comment.