diff --git a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
index 3ba1bf563ef..18255ddd6d4 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCallStreamObserver.java
@@ -16,6 +16,8 @@
package io.grpc.stub;
+import io.grpc.ExperimentalApi;
+
/**
* A refinement of {@link CallStreamObserver} to allows for interaction with call
* cancellation events on the server side.
@@ -143,4 +145,26 @@ public void disableAutoRequest() {
*/
@Override
public abstract void setMessageCompression(boolean enable);
+
+ /**
+ * Sets a {@link Runnable} to be executed when the call is closed cleanly from the server's
+ * point of view: either {@link #onCompleted()} or {@link #onError(Throwable)} has been called,
+ * all the messages and trailing metadata have been sent and the stream has been closed. Note
+ * however that the client still may have not received all the messages due to network delay,
+ * client crashes, and cancellation races.
+ *
+ *
Exactly one of {@code onCloseHandler} and {@code onCancelHandler} is guaranteed to be called
+ * when the RPC terminates.
+ *
+ * It is guaranteed that execution of {@code onCloseHandler} is serialized with calls to
+ * the 'inbound' {@link StreamObserver}. That also means that the callback will be delayed if
+ * other callbacks are running.
+ *
+ * This method may only be called during the initial call to the application, before the
+ * service returns its {@link StreamObserver request observer}.
+ *
+ * @param onCloseHandler to execute when the call has been closed cleanly.
+ */
+ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8467")
+ public abstract void setOnCloseHandler(Runnable onCloseHandler);
}
diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java
index ba08139b716..89e738adab1 100644
--- a/stub/src/main/java/io/grpc/stub/ServerCalls.java
+++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java
@@ -206,6 +206,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}
+
+ @Override
+ public void onComplete() {
+ if (responseObserver.onCloseHandler != null) {
+ responseObserver.onCloseHandler.run();
+ }
+ }
}
}
@@ -291,6 +298,13 @@ public void onReady() {
responseObserver.onReadyHandler.run();
}
}
+
+ @Override
+ public void onComplete() {
+ if (responseObserver.onCloseHandler != null) {
+ responseObserver.onCloseHandler.run();
+ }
+ }
}
}
@@ -320,6 +334,7 @@ private static final class ServerCallStreamObserverImpl
private Runnable onCancelHandler;
private boolean aborted = false;
private boolean completed = false;
+ private Runnable onCloseHandler;
// Non private to avoid synthetic class
ServerCallStreamObserverImpl(ServerCall call, boolean serverStreamingOrBidi) {
@@ -423,6 +438,14 @@ public void disableAutoRequest() {
public void request(int count) {
call.request(count);
}
+
+ @Override
+ public void setOnCloseHandler(Runnable onCloseHandler) {
+ checkState(!frozen, "Cannot alter onCloseHandler after initialization. May only be called "
+ + "during the initial call to the application, before the service returns its "
+ + "StreamObserver");
+ this.onCloseHandler = onCloseHandler;
+ }
}
/**
diff --git a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
index a2a1ef93961..7227d26c5b8 100644
--- a/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
+++ b/stub/src/test/java/io/grpc/stub/ServerCallsTest.java
@@ -199,6 +199,53 @@ public StreamObserver invoke(StreamObserver responseObserver)
callObserver.get().onCompleted();
}
+ @Test
+ public void onCloseHandlerCalledIfSetInStreamingClientCall() throws Exception {
+ final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean();
+ ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall(
+ new ServerCalls.BidiStreamingMethod() {
+ @Override
+ public StreamObserver invoke(StreamObserver responseObserver) {
+ ServerCallStreamObserver serverCallObserver =
+ (ServerCallStreamObserver) responseObserver;
+ serverCallObserver.setOnCloseHandler(new Runnable() {
+ @Override
+ public void run() {
+ onCloseHandlerCalled.set(true);
+ }
+ });
+ return new ServerCalls.NoopStreamObserver<>();
+ }
+ });
+ ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata());
+ callListener.onComplete();
+ assertTrue(onCloseHandlerCalled.get());
+ }
+
+ @Test
+ public void onCloseHandlerCalledIfSetInUnaryClientCall() throws Exception {
+ final AtomicBoolean onCloseHandlerCalled = new AtomicBoolean();
+ ServerCallHandler callHandler = ServerCalls.asyncServerStreamingCall(
+ new ServerCalls.ServerStreamingMethod() {
+ @Override
+ public void invoke(Integer request, StreamObserver responseObserver) {
+ ServerCallStreamObserver serverCallObserver =
+ (ServerCallStreamObserver) responseObserver;
+ serverCallObserver.setOnCloseHandler(new Runnable() {
+ @Override
+ public void run() {
+ onCloseHandlerCalled.set(true);
+ }
+ });
+ }
+ });
+ ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata());
+ callListener.onMessage(0);
+ callListener.onHalfClose();
+ callListener.onComplete();
+ assertTrue(onCloseHandlerCalled.get());
+ }
+
@Test
public void cannotSetOnCancelHandlerAfterServiceInvocation() throws Exception {
final AtomicReference> callObserver =
@@ -255,6 +302,31 @@ public void run() {
}
}
+ @Test
+ public void cannotSetOnCloseHandlerAfterServiceInvocation() throws Exception {
+ final AtomicReference> callObserver = new AtomicReference<>();
+ ServerCallHandler callHandler = ServerCalls.asyncBidiStreamingCall(
+ new ServerCalls.BidiStreamingMethod() {
+ @Override
+ public StreamObserver invoke(StreamObserver responseObserver) {
+ callObserver.set((ServerCallStreamObserver) responseObserver);
+ return new ServerCalls.NoopStreamObserver<>();
+ }
+ });
+ ServerCall.Listener callListener = callHandler.startCall(serverCall, new Metadata());
+ callListener.onMessage(1);
+ try {
+ callObserver.get().setOnCloseHandler(new Runnable() {
+ @Override
+ public void run() {
+ }
+ });
+ fail("Cannot set onReady after service invocation");
+ } catch (IllegalStateException expected) {
+ // Expected
+ }
+ }
+
@Test
public void cannotDisableAutoRequestAfterServiceInvocation() throws Exception {
final AtomicReference> callObserver =