Skip to content

Commit

Permalink
Do not create a connector multiple times for each rx() call
Browse files Browse the repository at this point in the history
Priority is to use @ClientAsyncExecutor annotated ExecutorServiceProvider

Signed-off-by: jansupol <jan.supol@oracle.com>
  • Loading branch information
jansupol committed Feb 11, 2021
1 parent 6e10965 commit d956c52
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2011, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -19,6 +19,7 @@
import java.lang.reflect.Type;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CancellationException;
Expand Down Expand Up @@ -61,10 +62,14 @@
import org.glassfish.jersey.client.internal.ClientResponseProcessingException;
import org.glassfish.jersey.client.internal.LocalizationMessages;
import org.glassfish.jersey.internal.MapPropertiesDelegate;
import org.glassfish.jersey.internal.inject.Bindings;
import org.glassfish.jersey.internal.inject.DisposableSupplier;
import org.glassfish.jersey.internal.inject.Providers;
import org.glassfish.jersey.internal.inject.ServiceHolder;
import org.glassfish.jersey.internal.util.Producer;
import org.glassfish.jersey.internal.util.PropertiesHelper;
import org.glassfish.jersey.internal.util.ReflectionHelper;
import org.glassfish.jersey.process.internal.ExecutorProviders;
import org.glassfish.jersey.process.internal.RequestScope;
import org.glassfish.jersey.spi.ExecutorServiceProvider;

Expand Down Expand Up @@ -474,7 +479,7 @@ public <T extends RxInvoker> T rx(Class<T> clazz) {
if (configured == null) {
final ExecutorService provided = executorService();
if (provided != null) {
request().getClientConfig().executorService(provided);
((ClientConfig) request().getConfiguration()).executorService(provided);
}
}
return (T) new JerseyCompletionStageRxInvoker(this);
Expand All @@ -498,9 +503,36 @@ private ExecutorService executorService() {
return result;
}

return this.requestContext.getInjectionManager()
.getInstance(ExecutorServiceProvider.class)
.getExecutorService();
final List<ServiceHolder<ExecutorServiceProvider>> serviceHolders =
this.requestContext.getInjectionManager().getAllServiceHolders(ExecutorServiceProvider.class);

BestServiceHolder best = serviceHolders.stream()
.map(BestServiceHolder::new).sorted((a, b) -> a.isBetterThen(b) ? -1 : 1).findFirst().get();

return best.provider.getExecutorService();
}

/*
* Priority goes to: 1) user async
* 2) user nonasync
* 3) default async
*/
private static final class BestServiceHolder {
private final ExecutorServiceProvider provider;
private final int value;

private BestServiceHolder(ServiceHolder<ExecutorServiceProvider> holder) {
provider = holder.getInstance();
boolean isDefault = DefaultClientAsyncExecutorProvider.class.equals(holder.getImplementationClass())
|| ClientExecutorProvidersConfigurator.ClientExecutorServiceProvider.class
.equals(holder.getImplementationClass());
boolean isAsync = holder.getImplementationClass().getAnnotation(ClientAsyncExecutor.class) != null;
value = 10 * (isDefault ? 0 : 1) + (isAsync ? 1 : 0);
}

public boolean isBetterThen(BestServiceHolder other) {
return this.value > other.value;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2017, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
Expand All @@ -16,9 +16,16 @@

package org.glassfish.jersey.client;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import javax.ws.rs.client.Client;
Expand All @@ -29,10 +36,12 @@
import javax.ws.rs.client.RxInvokerProvider;
import javax.ws.rs.client.SyncInvoker;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Configuration;
import javax.ws.rs.core.GenericType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.Provider;

import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.internal.guava.ThreadFactoryBuilder;

import org.glassfish.jersey.spi.ExecutorServiceProvider;
Expand All @@ -54,8 +63,9 @@
*/
public class ClientRxTest {

private static final ExecutorService EXECUTOR_SERVICE =
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("rxTest-%d").build());
private static final ExecutorService EXECUTOR_SERVICE = new ClientRxExecutorServiceWrapper(
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("rxTest-%d").build())
);

private final Client CLIENT;
private final Client CLIENT_WITH_EXECUTOR;
Expand Down Expand Up @@ -160,6 +170,35 @@ public void testRxInvokerNotRegistered() {
request.rx(TestRxInvoker.class).get();
}

@Test
public void testConnectorIsReusedWhenRx() throws ExecutionException, InterruptedException {
final AtomicInteger atomicInteger = new AtomicInteger(0);
HttpUrlConnectorProvider provider = new HttpUrlConnectorProvider() {
@Override
public Connector getConnector(Client client, Configuration config) {
atomicInteger.incrementAndGet();
return super.getConnector(client, config);
}
};

ClientConfig clientConfig = new ClientConfig();
clientConfig.connectorProvider(provider);

ClientRequestFilter abortFilter = (f) -> { f.abortWith(Response.ok().build()); };
Client client = ClientBuilder.newClient(clientConfig).register(abortFilter);

AtomicReference<String> threadName = new AtomicReference<>();
for (int cnt = 0; cnt != 5; cnt++) {
try (Response r = target(client)
.request().rx().get().toCompletableFuture().get()) {

assertEquals(200, r.getStatus());
assertEquals(1, atomicInteger.get());
}
}

}

private WebTarget target(Client client) {
// Uri is not relevant, the call won't be ever executed.
return client.target("http://localhost:9999");
Expand Down Expand Up @@ -207,4 +246,141 @@ public void dispose(ExecutorService executorService) {
//@After
}
}

// -----------------------------------------------------------------------------------------------------

@Test
public void testRxInvokerWithPriorityExecutorServiceProvider() {
AtomicReference<String> threadName = new AtomicReference<>();
String s = target(CLIENT)
.register(PriorityTestRxInvokerProvider.class)
.register(TestExecutorServiceProvider.class)
.register(PriorityTestExecutorServiceProvider.class)
.request().rx(PriorityTestRxInvoker.class).get();

assertTrue("Provided RxInvoker was not used.", s.startsWith("PriorityTestRxInvoker"));
assertTrue("@ClientAsyncExecutor Executor Service was not passed to RxInvoker", s.contains("TRUE"));
}

@ClientAsyncExecutor
private static class PriorityTestExecutorServiceProvider extends TestExecutorServiceProvider {
@Override
public ExecutorService getExecutorService() {
return new ClientRxExecutorServiceWrapper(EXECUTOR_SERVICE) {
//new class
};
}
}

@Provider
public static class PriorityTestRxInvokerProvider implements RxInvokerProvider<PriorityTestRxInvoker> {
@Override
public PriorityTestRxInvoker getRxInvoker(SyncInvoker syncInvoker, ExecutorService executorService) {
return new PriorityTestRxInvoker(syncInvoker, executorService);
}

@Override
public boolean isProviderFor(Class<?> clazz) {
return PriorityTestRxInvoker.class.equals(clazz);
}
}

private static class PriorityTestRxInvoker extends AbstractRxInvoker<String> {

private PriorityTestRxInvoker(SyncInvoker syncInvoker, ExecutorService executor) {
super(syncInvoker, executor);
}

@Override
public <R> String method(String name, Entity<?> entity, Class<R> responseType) {
return "PriorityTestRxInvoker " + (getExecutorService() != null
&& !ClientRxExecutorServiceWrapper.class.equals(getExecutorService().getClass())
&& ClientRxExecutorServiceWrapper.class.isInstance(getExecutorService()) ? "TRUE" : "FALSE");
}

@Override
public <R> String method(String name, Entity<?> entity, GenericType<R> responseType) {
return method(null, null, (Class<?>) null);
}
}

// -----------------------------------------------------------------------------------------------------

/**
* Wrap the executor service to distinguish the executor service obtained from the Injection Manager by class name
*/
private static class ClientRxExecutorServiceWrapper implements ExecutorService {
private final ExecutorService executorService;

private ClientRxExecutorServiceWrapper(ExecutorService executorService) {
this.executorService = executorService;
}

@Override
public void shutdown() {
executorService.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
return executorService.shutdownNow();
}

@Override
public boolean isShutdown() {
return executorService.isShutdown();
}

@Override
public boolean isTerminated() {
return executorService.isTerminated();
}

@Override
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return executorService.awaitTermination(timeout, unit);
}

@Override
public <T> Future<T> submit(Callable<T> task) {
return executorService.submit(task);
}

@Override
public <T> Future<T> submit(Runnable task, T result) {
return executorService.submit(task, result);
}

@Override
public Future<?> submit(Runnable task) {
return executorService.submit(task);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
return executorService.invokeAll(tasks);
}

@Override
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException {
return invokeAll(tasks, timeout, unit);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
return invokeAny(tasks);
}

@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return invokeAny(tasks, timeout, unit);
}

@Override
public void execute(Runnable command) {
executorService.execute(command);
}
}
}

0 comments on commit d956c52

Please sign in to comment.