diff --git a/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.api b/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.api index ccc9395d0d5..917809b1e71 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.api +++ b/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.api @@ -1,3 +1,11 @@ +public final class io/ktor/server/sse/Heartbeat { + public fun ()V + public final fun getDuration-UwyO8pc ()J + public final fun getEvent ()Lio/ktor/sse/ServerSentEvent; + public final fun setDuration-LRDsOJo (J)V + public final fun setEvent (Lio/ktor/sse/ServerSentEvent;)V +} + public final class io/ktor/server/sse/RoutingKt { public static final fun sse (Lio/ktor/server/routing/Route;Ljava/lang/String;Lkotlin/jvm/functions/Function2;)V public static final fun sse (Lio/ktor/server/routing/Route;Ljava/lang/String;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V @@ -33,6 +41,10 @@ public final class io/ktor/server/sse/ServerSSESession$DefaultImpls { public static synthetic fun send$default (Lio/ktor/server/sse/ServerSSESession;Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/lang/Long;Ljava/lang/String;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object; } +public final class io/ktor/server/sse/ServerSSESessionKt { + public static final fun heartbeat (Lio/ktor/server/sse/ServerSSESession;Lkotlin/jvm/functions/Function1;)V +} + public abstract interface class io/ktor/server/sse/ServerSSESessionWithSerialization : io/ktor/server/sse/ServerSSESession { public abstract fun getSerializer ()Lkotlin/jvm/functions/Function2; } diff --git a/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.klib.api b/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.klib.api index 93d8bbf2b19..95925c18427 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.klib.api +++ b/ktor-server/ktor-server-plugins/ktor-server-sse/api/ktor-server-sse.klib.api @@ -20,6 +20,17 @@ abstract interface io.ktor.server.sse/ServerSSESessionWithSerialization : io.kto abstract fun (): kotlin/Function2 // io.ktor.server.sse/ServerSSESessionWithSerialization.serializer.|(){}[0] } +final class io.ktor.server.sse/Heartbeat { // io.ktor.server.sse/Heartbeat|null[0] + constructor () // io.ktor.server.sse/Heartbeat.|(){}[0] + + final var duration // io.ktor.server.sse/Heartbeat.duration|{}duration[0] + final fun (): kotlin.time/Duration // io.ktor.server.sse/Heartbeat.duration.|(){}[0] + final fun (kotlin.time/Duration) // io.ktor.server.sse/Heartbeat.duration.|(kotlin.time.Duration){}[0] + final var event // io.ktor.server.sse/Heartbeat.event|{}event[0] + final fun (): io.ktor.sse/ServerSentEvent // io.ktor.server.sse/Heartbeat.event.|(){}[0] + final fun (io.ktor.sse/ServerSentEvent) // io.ktor.server.sse/Heartbeat.event.|(io.ktor.sse.ServerSentEvent){}[0] +} + final class io.ktor.server.sse/SSEServerContent : io.ktor.http.content/OutgoingContent.WriteChannelContent { // io.ktor.server.sse/SSEServerContent|null[0] constructor (io.ktor.server.application/ApplicationCall, kotlin.coroutines/SuspendFunction1) // io.ktor.server.sse/SSEServerContent.|(io.ktor.server.application.ApplicationCall;kotlin.coroutines.SuspendFunction1){}[0] constructor (io.ktor.server.application/ApplicationCall, kotlin.coroutines/SuspendFunction1, kotlin/Function2? = ...) // io.ktor.server.sse/SSEServerContent.|(io.ktor.server.application.ApplicationCall;kotlin.coroutines.SuspendFunction1;kotlin.Function2?){}[0] @@ -44,6 +55,7 @@ final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin.coroutine final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin/Function2, kotlin.coroutines/SuspendFunction1) // io.ktor.server.sse/sse|sse@io.ktor.server.routing.Route(kotlin.Function2;kotlin.coroutines.SuspendFunction1){}[0] final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin/String, kotlin.coroutines/SuspendFunction1) // io.ktor.server.sse/sse|sse@io.ktor.server.routing.Route(kotlin.String;kotlin.coroutines.SuspendFunction1){}[0] final fun (io.ktor.server.routing/Route).io.ktor.server.sse/sse(kotlin/String, kotlin/Function2, kotlin.coroutines/SuspendFunction1) // io.ktor.server.sse/sse|sse@io.ktor.server.routing.Route(kotlin.String;kotlin.Function2;kotlin.coroutines.SuspendFunction1){}[0] +final fun (io.ktor.server.sse/ServerSSESession).io.ktor.server.sse/heartbeat(kotlin/Function1) // io.ktor.server.sse/heartbeat|heartbeat@io.ktor.server.sse.ServerSSESession(kotlin.Function1){}[0] final suspend inline fun <#A: reified kotlin/Any> (io.ktor.server.sse/ServerSSESessionWithSerialization).io.ktor.server.sse/send(#A) // io.ktor.server.sse/send|send@io.ktor.server.sse.ServerSSESessionWithSerialization(0:0){0§}[0] final suspend inline fun <#A: reified kotlin/Any> (io.ktor.server.sse/ServerSSESessionWithSerialization).io.ktor.server.sse/send(#A? = ..., kotlin/String? = ..., kotlin/String? = ..., kotlin/Long? = ..., kotlin/String? = ...) // io.ktor.server.sse/send|send@io.ktor.server.sse.ServerSSESessionWithSerialization(0:0?;kotlin.String?;kotlin.String?;kotlin.Long?;kotlin.String?){0§}[0] final suspend inline fun <#A: reified kotlin/Any> (io.ktor.server.sse/ServerSSESessionWithSerialization).io.ktor.server.sse/send(io.ktor.sse/TypedServerSentEvent<#A>) // io.ktor.server.sse/send|send@io.ktor.server.sse.ServerSSESessionWithSerialization(io.ktor.sse.TypedServerSentEvent<0:0>){0§}[0] diff --git a/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSEServerContent.kt b/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSEServerContent.kt index 45b260c7411..928c78cd28e 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSEServerContent.kt +++ b/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/SSEServerContent.kt @@ -53,6 +53,8 @@ public class SSEServerContent( session?.handle() } } finally { + val heartbeatJob = call.attributes.getOrNull(heartbeatJobKey) + heartbeatJob?.cancel() session?.close() } } diff --git a/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/ServerSSESession.kt b/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/ServerSSESession.kt index 46627989635..57ca84f8355 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/ServerSSESession.kt +++ b/ktor-server/ktor-server-plugins/ktor-server-sse/common/src/io/ktor/server/sse/ServerSSESession.kt @@ -6,9 +6,12 @@ package io.ktor.server.sse import io.ktor.server.application.* import io.ktor.sse.* +import io.ktor.util.* import io.ktor.util.reflect.* import io.ktor.websocket.* import kotlinx.coroutines.* +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds /** * Represents a server-side Server-Sent Events (SSE) session. @@ -130,3 +133,36 @@ public suspend inline fun ServerSSESessionWithSerialization.se public suspend inline fun ServerSSESessionWithSerialization.send(data: T) { send(ServerSentEvent(serializer(typeInfo(), data))) } + +/** + * Starts a heartbeat for the ServerSSESession. + * + * The heartbeat will send the specified [Heartbeat.event] at the specified [Heartbeat.duration] interval + * as long as the session is active. + * + * @param heartbeatConfig a lambda that configures the [Heartbeat] object used for the heartbeat. + */ +public fun ServerSSESession.heartbeat(heartbeatConfig: Heartbeat.() -> Unit) { + val heartbeat = Heartbeat().apply(heartbeatConfig) + val heartbeatJob = Job(call.coroutineContext[Job]) + launch(heartbeatJob + CoroutineName("sse-heartbeat")) { + while (true) { + send(heartbeat.event) + delay(heartbeat.duration) + } + } + call.attributes.put(heartbeatJobKey, heartbeatJob) +} + +internal val heartbeatJobKey = AttributeKey("HeartbeatJobAttributeKey") + +/** + * Represents a heartbeat configuration for a [ServerSSESession]. + * + * @property duration the duration between heartbeat events, default is 30 seconds. + * @property event the [ServerSentEvent] to be sent as the heartbeat, default is a [ServerSentEvent] with the comment "heartbeat". + */ +public class Heartbeat { + public var duration: Duration = 30.seconds + public var event: ServerSentEvent = ServerSentEvent(comments = "heartbeat") +} diff --git a/ktor-server/ktor-server-plugins/ktor-server-sse/common/test/io/ktor/server/sse/ServerSentEventsTest.kt b/ktor-server/ktor-server-plugins/ktor-server-sse/common/test/io/ktor/server/sse/ServerSentEventsTest.kt index f9200fd40ca..5751b6e40a2 100644 --- a/ktor-server/ktor-server-plugins/ktor-server-sse/common/test/io/ktor/server/sse/ServerSentEventsTest.kt +++ b/ktor-server/ktor-server-plugins/ktor-server-sse/common/test/io/ktor/server/sse/ServerSentEventsTest.kt @@ -1,6 +1,6 @@ /* -* Copyright 2014-2023 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. -*/ + * Copyright 2014-2024 JetBrains s.r.o and contributors. Use of this source code is governed by the Apache 2.0 license. + */ package io.ktor.server.sse @@ -11,11 +11,19 @@ import io.ktor.client.statement.* import io.ktor.http.* import io.ktor.server.testing.* import io.ktor.sse.* -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import kotlinx.serialization.* -import kotlinx.serialization.json.* -import kotlin.test.* +import kotlinx.coroutines.cancel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.collectIndexed +import kotlinx.coroutines.flow.single +import kotlinx.coroutines.launch +import kotlinx.coroutines.withTimeout +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json +import kotlinx.serialization.serializer +import kotlin.test.Test +import kotlin.test.assertContains +import kotlin.test.assertEquals +import kotlin.time.Duration.Companion.milliseconds class ServerSentEventsTest { @@ -291,6 +299,42 @@ class ServerSentEventsTest { ) } + @Test + fun testHeartbeat() = testApplication { + install(SSE) + routing { + sse { + heartbeat { + duration = 10.milliseconds + event = ServerSentEvent("heartbeat") + } + + repeat(4) { + send("Hello") + delay(10.milliseconds) + } + } + } + + val client = createSseClient() + + var hellos = 0 + var heartbeats = 0 + withTimeout(5_000) { + client.sse { + incoming.collect { event -> + when (event.data) { + "Hello" -> hellos++ + "heartbeat" -> heartbeats++ + } + if (hellos > 3 && heartbeats > 3) { + cancel() + } + } + } + } + } + private fun ApplicationTestBuilder.createSseClient(): HttpClient { val client = createClient { install(io.ktor.client.plugins.sse.SSE)