Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send headers when failure is null #335

Merged
merged 5 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion stub/src/main/java/io/grpc/kotlin/ServerCalls.kt
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ object ServerCalls {
}.exceptionOrNull()
// check headers again once we're done collecting the response flow - if we received
// no elements or threw an exception, then we wouldn't have sent them
if (headersSent.compareAndSet(false, true)) {
if (failure == null && headersSent.compareAndSet(false, true)) {
mutex.withLock {
call.sendHeaders(GrpcMetadata())
}
Expand Down
18 changes: 17 additions & 1 deletion stub/src/test/java/io/grpc/kotlin/AbstractCallsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ abstract class AbstractCallsTest {
fun makeChannel(impl: BindableService, vararg interceptors: ServerInterceptor): ManagedChannel =
makeChannel(ServerInterceptors.intercept(impl, *interceptors))

fun makeChannel(serverServiceDefinition: ServerServiceDefinition): ManagedChannel {
fun makeChannel(
serverServiceDefinition: ServerServiceDefinition,
serviceConfig: Map<String, Any> = emptyMap()
): ManagedChannel {
val serverName = InProcessServerBuilder.generateName()

grpcCleanup.register(
Expand All @@ -168,6 +171,8 @@ abstract class AbstractCallsTest {
return grpcCleanup.register(
InProcessChannelBuilder
.forName(serverName)
.enableRetry()
.defaultServiceConfig(serviceConfig)
.run { this as io.grpc.ManagedChannelBuilder<*> } // workaround b/123879662
.executor(executor)
.build()
Expand All @@ -189,6 +194,17 @@ abstract class AbstractCallsTest {
return makeChannel(ServerInterceptors.intercept(builder.build(), *interceptors))
}

fun makeChannel(
serverServiceDefinition: ServerServiceDefinition,
config: Map<String, Any> = emptyMap(),
vararg interceptors: ServerInterceptor
): ManagedChannel {
return makeChannel(
ServerInterceptors.intercept(serverServiceDefinition, *interceptors),
config
)
}

fun <R> runBlocking(block: suspend CoroutineScope.() -> R): Unit =
kotlinx.coroutines.runBlocking(context) {
block()
Expand Down
75 changes: 64 additions & 11 deletions stub/src/test/java/io/grpc/kotlin/ServerCallsTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,12 @@ package io.grpc.kotlin

import com.google.common.truth.Truth.assertThat
import com.google.common.truth.extensions.proto.ProtoTruth.assertThat
import io.grpc.CallOptions
import io.grpc.ClientCall
import io.grpc.Context
import io.grpc.Contexts
import io.grpc.Metadata
import io.grpc.ServerCall
import io.grpc.ServerCallHandler
import io.grpc.ServerInterceptor
import io.grpc.Status
import io.grpc.StatusException
import io.grpc.StatusRuntimeException
import io.grpc.*
import io.grpc.examples.helloworld.GreeterGrpc
import io.grpc.examples.helloworld.HelloReply
import io.grpc.examples.helloworld.HelloRequest
import io.grpc.examples.helloworld.GreeterGrpcKt.GreeterCoroutineStub
import io.grpc.examples.helloworld.GreeterGrpcKt.GreeterCoroutineImplBase
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
Expand Down Expand Up @@ -893,6 +885,7 @@ class ServerCallsTest : AbstractCallsTest() {
}

override fun onClose(status: Status, trailers: Metadata) {
headersReceived.complete()
closeStatus.complete(status)
}
},
Expand Down Expand Up @@ -958,4 +951,64 @@ class ServerCallsTest : AbstractCallsTest() {
val status = closeStatus.await()
assertThat(status.code).isEqualTo(Status.Code.OK)
}

@Test
fun coroutinesServerRetry() {
runBlocking {
val retryCount = 5
val config = getRetryingServiceConfig(retryCount.toDouble())
val coroutinesServer = object : GreeterCoroutineImplBase() {
var count = 0
private set

override suspend fun sayHello(request: HelloRequest): HelloReply {
count++
throw StatusRuntimeException(Status.UNKNOWN)
}
}

val channel = makeChannel(coroutinesServer.bindService(), config)

val coroutineStub = GreeterCoroutineStub(channel)

try {
coroutineStub.sayHello(helloRequest("hello"))
} catch (e: Exception) {
assertThat(coroutinesServer.count).isEqualTo(retryCount)
}
}
}

private fun getRetryingServiceConfig(
retryCount: Double
): Map<String, Any> {
val config = hashMapOf<String, Any>()

val name = mutableListOf<Map<String, Any>>()
name.add(
mapOf(
"service" to "helloworld.Greeter",
"method" to "SayHello"
)
)

val retryPolicy = hashMapOf<String, Any>()
retryPolicy["maxAttempts"] = retryCount
retryPolicy["initialBackoff"] = "0.5s"
retryPolicy["maxBackoff"] = "30s"
retryPolicy["backoffMultiplier"] = 2.0
retryPolicy["retryableStatusCodes"] = listOf("UNKNOWN")

val methodConfig = mutableListOf<Map<String, Any>>()
val serviceConfig = hashMapOf<String, Any>()

serviceConfig["name"] = name
serviceConfig["retryPolicy"] = retryPolicy

methodConfig.add(serviceConfig)

config["methodConfig"] = methodConfig

return config
}
}