Skip to content

Commit

Permalink
Pass forceExecution flag to transport interceptor (#22739)
Browse files Browse the repository at this point in the history
To effectively allow a plugin to intercept a transport handler it needs
to know if the handler must be executed even if there is a rejection on the
thread pool in the case the wrapper forks a thread to execute the actual handler.
  • Loading branch information
s1monw committed Jan 23, 2017
1 parent 8e54453 commit d638819
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.tasks.RawTaskStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -238,9 +237,10 @@ private CompositeTransportInterceptor(List<TransportInterceptor> transportInterc

@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler) {
for (TransportInterceptor interceptor : this.transportInterceptors) {
actualHandler = interceptor.interceptHandler(action, executor, actualHandler);
actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler);
}
return actualHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public interface TransportInterceptor {
* used instead of the passed in handler. By default the provided handler is returned.
*/
default <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler) {
return actualHandler;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ public TransportAddress[] addressesFromString(String address, int perAddressLimi
*/
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory,
String executor, TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, executor, handler);
handler = interceptor.interceptHandler(action, executor, false, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, requestFactory, taskManager, handler, executor, false, true);
registerRequestHandler(reg);
Expand All @@ -695,7 +695,7 @@ public <Request extends TransportRequest> void registerRequestHandler(String act
String executor, boolean forceExecution,
boolean canTripCircuitBreaker,
TransportRequestHandler<Request> handler) {
handler = interceptor.interceptHandler(action, executor, handler);
handler = interceptor.interceptHandler(action, executor, forceExecution, handler);
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(
action, request, taskManager, handler, executor, forceExecution, canTripCircuitBreaker);
registerRequestHandler(reg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,7 @@ public List<TransportInterceptor> getTransportInterceptors(ThreadContext threadC

@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler) {
return new InterceptingRequestHandler<>(action, actualHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ public List<TransportInterceptor> getTransportInterceptors(ThreadContext threadC
return Collections.singletonList(new TransportInterceptor() {
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler) {
return instance.interceptHandler(action, executor, actualHandler);
return instance.interceptHandler(action, executor, forceExecution, actualHandler);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInterceptor;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestHandler;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -50,6 +52,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

public class NetworkModuleTests extends ModuleTestCase {
Expand Down Expand Up @@ -251,8 +254,21 @@ public void testRegisterInterceptor() {
Settings settings = Settings.builder()
.put(NetworkModule.HTTP_ENABLED.getKey(), false)
.put(NetworkModule.TRANSPORT_TYPE_KEY, "local").build();
AtomicInteger called = new AtomicInteger(0);

TransportInterceptor interceptor = new TransportInterceptor() {
@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(String action, String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler) {
called.incrementAndGet();
if ("foo/bar/boom".equals(action)) {
assertTrue(forceExecution);
} else {
assertFalse(forceExecution);
}
return actualHandler;
}
};
NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() {
@Override
Expand All @@ -263,6 +279,11 @@ public List<TransportInterceptor> getTransportInterceptors(ThreadContext threadC
});

TransportInterceptor transportInterceptor = module.getTransportInterceptor();
assertEquals(0, called.get());
transportInterceptor.interceptHandler("foo/bar/boom", null, true, null);
assertEquals(1, called.get());
transportInterceptor.interceptHandler("foo/baz/boom", null, false, null);
assertEquals(2, called.get());
assertTrue(transportInterceptor instanceof NetworkModule.CompositeTransportInterceptor);
assertEquals(((NetworkModule.CompositeTransportInterceptor)transportInterceptor).transportInterceptors.size(), 1);
assertSame(((NetworkModule.CompositeTransportInterceptor)transportInterceptor).transportInterceptors.get(0), interceptor);
Expand Down

0 comments on commit d638819

Please sign in to comment.