Skip to content

Commit

Permalink
fix: Make Java gRPC client use timeouts as expected
Browse files Browse the repository at this point in the history
Signed-off-by: Jose Acevedo <sharp.acevedo@gmail.com>
  • Loading branch information
acevedosharp committed May 29, 2024
1 parent c08e6f9 commit 256351d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 18 deletions.
38 changes: 23 additions & 15 deletions java/serving-client/src/main/java/dev/feast/FeastClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import feast.proto.serving.ServingServiceGrpc.ServingServiceBlockingStub;
import feast.proto.types.ValueProto;
import io.grpc.CallCredentials;
import io.grpc.Deadline;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
Expand All @@ -50,6 +49,7 @@ public class FeastClient implements AutoCloseable {

private final ManagedChannel channel;
private final ServingServiceBlockingStub stub;
private final long requestTimeout;

/**
* Create a client to access Feast Serving.
Expand All @@ -68,12 +68,14 @@ public static FeastClient create(String host, int port) {
*
* @param host hostname or ip address of Feast serving GRPC server
* @param port port number of Feast serving GRPC server
* @param deadline GRPC deadline of Feast serving GRPC server {@link Deadline}
* @param requestTimeout maximum duration for online retrievals from the GRPC server in
* milliseconds, use 0 for no timeout
* @return {@link FeastClient}
*/
public static FeastClient create(String host, int port, Deadline deadline) {
public static FeastClient create(String host, int port, long requestTimeout) {
// configure client with no security config.
return FeastClient.createSecure(host, port, SecurityConfig.newBuilder().build(), deadline);
return FeastClient.createSecure(
host, port, SecurityConfig.newBuilder().build(), requestTimeout);
}

/**
Expand All @@ -86,7 +88,7 @@ public static FeastClient create(String host, int port, Deadline deadline) {
* @return {@link FeastClient}
*/
public static FeastClient createSecure(String host, int port, SecurityConfig securityConfig) {
return createSecure(host, port, securityConfig, null);
return FeastClient.createSecure(host, port, securityConfig, 0);
}

/**
Expand All @@ -96,11 +98,17 @@ public static FeastClient createSecure(String host, int port, SecurityConfig sec
* @param port port number of Feast serving GRPC server
* @param securityConfig security options to configure the Feast client. See {@link
* SecurityConfig} for options.
* @param deadline GRPC deadline of Feast serving GRPC server {@link Deadline}
* @param requestTimeout maximum duration for online retrievals from the GRPC server in
* milliseconds
* @return {@link FeastClient}
*/
public static FeastClient createSecure(
String host, int port, SecurityConfig securityConfig, Deadline deadline) {
String host, int port, SecurityConfig securityConfig, long requestTimeout) {

if (requestTimeout < 0) {
throw new IllegalArgumentException("Request timeout can't be negative");
}

// Configure client TLS
ManagedChannel channel = null;
if (securityConfig.isTLSEnabled()) {
Expand All @@ -127,7 +135,7 @@ public static FeastClient createSecure(
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext().build();
}

return new FeastClient(channel, securityConfig.getCredentials(), Optional.ofNullable(deadline));
return new FeastClient(channel, securityConfig.getCredentials(), requestTimeout);
}

/**
Expand Down Expand Up @@ -158,7 +166,10 @@ public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> entities)

requestBuilder.putAllEntities(getEntityValuesMap(entities));

GetOnlineFeaturesResponse response = stub.getOnlineFeatures(requestBuilder.build());
ServingServiceGrpc.ServingServiceBlockingStub timedStub =
requestTimeout != 0 ? stub.withDeadlineAfter(requestTimeout, TimeUnit.MILLISECONDS) : stub;

GetOnlineFeaturesResponse response = timedStub.getOnlineFeatures(requestBuilder.build());

List<Row> results = Lists.newArrayList();
if (response.getResultsCount() == 0) {
Expand Down Expand Up @@ -231,12 +242,13 @@ public List<Row> getOnlineFeatures(List<String> featureRefs, List<Row> rows, Str
}

protected FeastClient(ManagedChannel channel, Optional<CallCredentials> credentials) {
this(channel, credentials, Optional.empty());
this(channel, credentials, -1);
}

protected FeastClient(
ManagedChannel channel, Optional<CallCredentials> credentials, Optional<Deadline> deadline) {
ManagedChannel channel, Optional<CallCredentials> credentials, long requestTimeout) {
this.channel = channel;
this.requestTimeout = requestTimeout;
TracingClientInterceptor tracingInterceptor =
TracingClientInterceptor.newBuilder().withTracer(GlobalTracer.get()).build();

Expand All @@ -247,10 +259,6 @@ protected FeastClient(
servingStub = servingStub.withCallCredentials(credentials.get());
}

if (deadline.isPresent()) {
servingStub = servingStub.withDeadline(deadline.get());
}

this.stub = servingStub;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class FeastClientTest {
private final String AUTH_TOKEN = "test token";
private final Deadline DEADLINE = Deadline.after(2, TimeUnit.SECONDS);
private final long TIMEOUT_MILLIS = 100;

@Rule public GrpcCleanupRule grpcRule;
private AtomicBoolean isAuthenticated;
Expand Down Expand Up @@ -88,7 +87,7 @@ public void setup() throws Exception {
ManagedChannel channel =
this.grpcRule.register(
InProcessChannelBuilder.forName(serverName).directExecutor().build());
this.client = new FeastClient(channel, Optional.empty(), Optional.of(DEADLINE));
this.client = new FeastClient(channel, Optional.empty(), TIMEOUT_MILLIS);
}

@Test
Expand Down

0 comments on commit 256351d

Please sign in to comment.