Skip to content

Commit

Permalink
[Java][FlightRPC] Support configuring backpressure threshold
Browse files Browse the repository at this point in the history
* Update to grpc-java 1.63.0
* Add to FlightServer.Builder an option to set the number of bytes queued before blocking
due to backpressure. Set the default to 10MB instead of gRPC's default of 64K.
* Add a ServerInterceptor for automating setting the backpressure threshold on ServerCalls.
  • Loading branch information
jduo committed Apr 6, 2024
1 parent 6aa3321 commit dd55c79
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.arrow.flight.auth2.Auth2Constants;
import org.apache.arrow.flight.auth2.CallHeaderAuthenticator;
import org.apache.arrow.flight.auth2.ServerCallHeaderAuthMiddleware;
import org.apache.arrow.flight.grpc.ServerBackpressureThresholdInterceptor;
import org.apache.arrow.flight.grpc.ServerInterceptorAdapter;
import org.apache.arrow.flight.grpc.ServerInterceptorAdapter.KeyFactory;
import org.apache.arrow.memory.BufferAllocator;
Expand Down Expand Up @@ -79,6 +80,9 @@ public class FlightServer implements AutoCloseable {
/** The maximum size of an individual gRPC message. This effectively disables the limit. */
static final int MAX_GRPC_MESSAGE_SIZE = Integer.MAX_VALUE;

/** The default number of bytes that can be queued on an output stream before blocking. */
public static final int DEFAULT_BACKPRESSURE_THRESHOLD = 10 * 1024 * 1024; // 10MB

/** Create a new instance from a gRPC server. For internal use only. */
private FlightServer(Location location, Server server, ExecutorService grpcExecutor) {
this.location = location;
Expand Down Expand Up @@ -179,6 +183,7 @@ public static final class Builder {
private CallHeaderAuthenticator headerAuthenticator = CallHeaderAuthenticator.NO_OP;
private ExecutorService executor = null;
private int maxInboundMessageSize = MAX_GRPC_MESSAGE_SIZE;
private int backpressureThreshold = DEFAULT_BACKPRESSURE_THRESHOLD;
private InputStream certChain;
private InputStream key;
private InputStream mTlsCACert;
Expand Down Expand Up @@ -300,6 +305,7 @@ public FlightServer build() {
.addService(
ServerInterceptors.intercept(
flightService,
new ServerBackpressureThresholdInterceptor(backpressureThreshold),
new ServerAuthInterceptor(authHandler)));

// Allow hooking into the gRPC builder. This is not guaranteed to be available on all Arrow versions or
Expand Down Expand Up @@ -336,6 +342,15 @@ public Builder maxInboundMessageSize(int maxMessageSize) {
return this;
}

/**
* Set the number of bytes that may be queued on a server output stream before writes are blocked.
*/
public Builder backpressureThreshold(int backpressureThreshold) {
Preconditions.checkArgument(backpressureThreshold > 0);
this.backpressureThreshold = backpressureThreshold;
return this;
}

/**
* A small utility function to ensure that InputStream attributes.
* are closed if they are not null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.flight.grpc;

import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;

/**
* An interceptor for specifying the number of bytes that can be queued before a call with an output stream
* gets blocked by backpressure.
*/
public class ServerBackpressureThresholdInterceptor implements ServerInterceptor {

private final int numBytes;

public ServerBackpressureThresholdInterceptor(int numBytes) {
this.numBytes = numBytes;
}

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
call.setOnReadyThreshold(numBytes);
return next.startCall(call, headers);
}
}
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
<dep.slf4j.version>2.0.11</dep.slf4j.version>
<dep.guava-bom.version>33.0.0-jre</dep.guava-bom.version>
<dep.netty-bom.version>4.1.108.Final</dep.netty-bom.version>
<dep.grpc-bom.version>1.62.2</dep.grpc-bom.version>
<dep.grpc-bom.version>1.63.0</dep.grpc-bom.version>
<dep.protobuf-bom.version>3.23.1</dep.protobuf-bom.version>
<dep.jackson-bom.version>2.17.0</dep.jackson-bom.version>
<dep.hadoop.version>3.4.0</dep.hadoop.version>
Expand Down

0 comments on commit dd55c79

Please sign in to comment.