diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java index d873f7d2828d0..dc545c131828a 100644 --- a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/FlightServer.java @@ -42,6 +42,7 @@ import org.apache.arrow.flight.auth2.Auth2Constants; import org.apache.arrow.flight.auth2.CallHeaderAuthenticator; import org.apache.arrow.flight.auth2.ServerCallHeaderAuthMiddleware; +import org.apache.arrow.flight.grpc.ServerBackpressureThresholdInterceptor; import org.apache.arrow.flight.grpc.ServerInterceptorAdapter; import org.apache.arrow.flight.grpc.ServerInterceptorAdapter.KeyFactory; import org.apache.arrow.memory.BufferAllocator; @@ -79,6 +80,9 @@ public class FlightServer implements AutoCloseable { /** The maximum size of an individual gRPC message. This effectively disables the limit. */ static final int MAX_GRPC_MESSAGE_SIZE = Integer.MAX_VALUE; + /** The default number of bytes that can be queued on an output stream before blocking. */ + public static final int DEFAULT_BACKPRESSURE_THRESHOLD = 10 * 1024 * 1024; // 10MB + /** Create a new instance from a gRPC server. For internal use only. */ private FlightServer(Location location, Server server, ExecutorService grpcExecutor) { this.location = location; @@ -179,6 +183,7 @@ public static final class Builder { private CallHeaderAuthenticator headerAuthenticator = CallHeaderAuthenticator.NO_OP; private ExecutorService executor = null; private int maxInboundMessageSize = MAX_GRPC_MESSAGE_SIZE; + private int backpressureThreshold = DEFAULT_BACKPRESSURE_THRESHOLD; private InputStream certChain; private InputStream key; private InputStream mTlsCACert; @@ -300,6 +305,7 @@ public FlightServer build() { .addService( ServerInterceptors.intercept( flightService, + new ServerBackpressureThresholdInterceptor(backpressureThreshold), new ServerAuthInterceptor(authHandler))); // Allow hooking into the gRPC builder. This is not guaranteed to be available on all Arrow versions or @@ -336,6 +342,15 @@ public Builder maxInboundMessageSize(int maxMessageSize) { return this; } + /** + * Set the number of bytes that may be queued on a server output stream before writes are blocked. + */ + public Builder backpressureThreshold(int backpressureThreshold) { + Preconditions.checkArgument(backpressureThreshold > 0); + this.backpressureThreshold = backpressureThreshold; + return this; + } + /** * A small utility function to ensure that InputStream attributes. * are closed if they are not null diff --git a/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerBackpressureThresholdInterceptor.java b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerBackpressureThresholdInterceptor.java new file mode 100644 index 0000000000000..bd42fbc8ad6a4 --- /dev/null +++ b/java/flight/flight-core/src/main/java/org/apache/arrow/flight/grpc/ServerBackpressureThresholdInterceptor.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.flight.grpc; + +import io.grpc.Metadata; +import io.grpc.ServerCall; +import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; + +/** + * An interceptor for specifying the number of bytes that can be queued before a call with an output stream + * gets blocked by backpressure. + */ +public class ServerBackpressureThresholdInterceptor implements ServerInterceptor { + + private final int numBytes; + + public ServerBackpressureThresholdInterceptor(int numBytes) { + this.numBytes = numBytes; + } + + @Override + public ServerCall.Listener interceptCall(ServerCall call, Metadata headers, + ServerCallHandler next) { + call.setOnReadyThreshold(numBytes); + return next.startCall(call, headers); + } +} diff --git a/java/pom.xml b/java/pom.xml index 95b27922eafa9..9892061677d09 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -34,7 +34,7 @@ 2.0.11 33.0.0-jre 4.1.108.Final - 1.62.2 + 1.63.0 3.23.1 2.17.0 3.4.0