Skip to content

Commit

Permalink
Recover from executor shutdowns gracefully.
Browse files Browse the repository at this point in the history
This turns out to be pretty difficult because of the way our
dispatcher works.

Calls can be rejected either immediately when the user calls
enqueue(), or later when a queued call is promoted.

It's also awkward because we don't want to hold locks when
calling the user's callFailed() method.
  • Loading branch information
squarejesse committed Nov 4, 2018
1 parent 2b0a9f4 commit dfd6c95
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 18 deletions.
57 changes: 56 additions & 1 deletion okhttp-tests/src/test/java/okhttp3/DispatcherTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package okhttp3;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -10,6 +11,7 @@
import java.util.Set;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import okhttp3.RealCall.AsyncCall;
Expand All @@ -28,13 +30,16 @@ public final class DispatcherTest {
RecordingCallback callback = new RecordingCallback();
RecordingWebSocketListener webSocketListener = new RecordingWebSocketListener();
Dispatcher dispatcher = new Dispatcher(executor);
RecordingEventListener listener = new RecordingEventListener();
OkHttpClient client = defaultClient().newBuilder()
.dispatcher(dispatcher)
.eventListener(listener)
.build();

@Before public void setUp() throws Exception {
dispatcher.setMaxRequests(20);
dispatcher.setMaxRequestsPerHost(10);
listener.forbidLock(dispatcher);
}

@Test public void maxRequestsZero() throws Exception {
Expand Down Expand Up @@ -264,6 +269,54 @@ public final class DispatcherTest {
assertTrue(idle.get());
}

@Test public void executionRejectedImmediately() throws Exception {
Request request = newRequest("http://a/1");
executor.shutdown();
client.newCall(request).enqueue(callback);
callback.await(request.url()).assertFailure(InterruptedIOException.class);
assertEquals(Arrays.asList("CallStart", "CallFailed"), listener.recordedEventTypes());
}

@Test public void executionRejectedAfterMaxRequestsChange() throws Exception {
Request request1 = newRequest("http://a/1");
Request request2 = newRequest("http://a/2");
dispatcher.setMaxRequests(1);
client.newCall(request1).enqueue(callback);
executor.shutdown();
client.newCall(request2).enqueue(callback);
dispatcher.setMaxRequests(2); // Trigger promotion.
callback.await(request2.url()).assertFailure(InterruptedIOException.class);

assertEquals(Arrays.asList("CallStart", "CallStart", "CallFailed"),
listener.recordedEventTypes());
}

@Test public void executionRejectedAfterMaxRequestsPerHostChange() throws Exception {
Request request1 = newRequest("http://a/1");
Request request2 = newRequest("http://a/2");
dispatcher.setMaxRequestsPerHost(1);
client.newCall(request1).enqueue(callback);
executor.shutdown();
client.newCall(request2).enqueue(callback);
dispatcher.setMaxRequestsPerHost(2); // Trigger promotion.
callback.await(request2.url()).assertFailure(InterruptedIOException.class);
assertEquals(Arrays.asList("CallStart", "CallStart", "CallFailed"),
listener.recordedEventTypes());
}

@Test public void executionRejectedAfterPrecedingCallFinishes() throws Exception {
Request request1 = newRequest("http://a/1");
Request request2 = newRequest("http://a/2");
dispatcher.setMaxRequests(1);
client.newCall(request1).enqueue(callback);
executor.shutdown();
client.newCall(request2).enqueue(callback);
executor.finishJob("http://a/1"); // Trigger promotion.
callback.await(request2.url()).assertFailure(InterruptedIOException.class);
assertEquals(Arrays.asList("CallStart", "CallStart", "CallFailed"),
listener.recordedEventTypes());
}

private <T> Set<T> set(T... values) {
return set(Arrays.asList(values));
}
Expand All @@ -287,9 +340,11 @@ private Thread makeSynchronousCall(final Call call) {
}

class RecordingExecutor extends AbstractExecutorService {
private boolean shutdown;
private List<AsyncCall> calls = new ArrayList<>();

@Override public void execute(Runnable command) {
if (shutdown) throw new RejectedExecutionException();
calls.add((AsyncCall) command);
}

Expand All @@ -314,7 +369,7 @@ public void finishJob(String url) {
}

@Override public void shutdown() {
throw new UnsupportedOperationException();
shutdown = true;
}

@Override public List<Runnable> shutdownNow() {
Expand Down
69 changes: 52 additions & 17 deletions okhttp/src/main/java/okhttp3/Dispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import okhttp3.RealCall.AsyncCall;
import okhttp3.internal.Util;

import static java.util.Collections.emptyList;

/**
* Policy on when async requests are executed.
*
Expand Down Expand Up @@ -75,12 +77,18 @@ public synchronized ExecutorService executorService() {
* <p>If more than {@code maxRequests} requests are in flight when this is invoked, those requests
* will remain in flight.
*/
public synchronized void setMaxRequests(int maxRequests) {
public void setMaxRequests(int maxRequests) {
if (maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
}
this.maxRequests = maxRequests;
promoteCalls();
List<AsyncCall> promotedCalls;
synchronized (this) {
this.maxRequests = maxRequests;
promotedCalls = promoteCalls();
}
for (int i = 0, size = promotedCalls.size(); i < size; i++) {
promotedCalls.get(i).executeOn(executorService());
}
}

public synchronized int getMaxRequests() {
Expand All @@ -98,12 +106,18 @@ public synchronized int getMaxRequests() {
*
* <p>WebSocket connections to hosts <b>do not</b> count against this limit.
*/
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
public void setMaxRequestsPerHost(int maxRequestsPerHost) {
if (maxRequestsPerHost < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
}
this.maxRequestsPerHost = maxRequestsPerHost;
promoteCalls();
List<AsyncCall> promotedCalls;
synchronized (this) {
this.maxRequestsPerHost = maxRequestsPerHost;
promotedCalls = promoteCalls();
}
for (int i = 0, size = promotedCalls.size(); i < size; i++) {
promotedCalls.get(i).executeOn(executorService());
}
}

public synchronized int getMaxRequestsPerHost() {
Expand All @@ -126,13 +140,16 @@ public synchronized void setIdleCallback(@Nullable Runnable idleCallback) {
this.idleCallback = idleCallback;
}

synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
void enqueue(AsyncCall call) {
synchronized (this) {
if (runningAsyncCalls.size() >= maxRequests
|| runningCallsForHost(call) >= maxRequestsPerHost) {
readyAsyncCalls.add(call);
return; // This call won't be executed until another completes.
}
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
call.executeOn(executorService());
}

/**
Expand All @@ -153,21 +170,32 @@ public synchronized void cancelAll() {
}
}

private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
/**
* Returns the calls that have been promoted, and that the caller must {@link AsyncCall#executeOn
* execute}. This method doesn't do so directly because this is synchronized on the dispatcher
* and executing calls must not be.
*/
private List<AsyncCall> promoteCalls() {
assert (Thread.holdsLock(this));

List<AsyncCall> result = emptyList();
if (runningAsyncCalls.size() >= maxRequests) return result; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return result; // No ready calls to promote.

for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();

if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
if (result.isEmpty()) result = new ArrayList<>();
result.add(call);
runningAsyncCalls.add(call);
executorService().execute(call);
}

if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
if (runningAsyncCalls.size() >= maxRequests) break; // Reached max capacity.
}

return result;
}

/** Returns the number of running calls that share a host with {@code call}. */
Expand Down Expand Up @@ -198,13 +226,20 @@ void finished(RealCall call) {
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
List<AsyncCall> promotedCalls;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
promotedCalls = promoteCalls
? promoteCalls()
: Collections.<AsyncCall>emptyList();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}

for (int i = 0, size = promotedCalls.size(); i < size; i++) {
promotedCalls.get(i).executeOn(executorService());
}

if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
Expand Down
25 changes: 25 additions & 0 deletions okhttp/src/main/java/okhttp3/RealCall.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
package okhttp3;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import javax.annotation.Nullable;
import okhttp3.internal.NamedRunnable;
import okhttp3.internal.cache.CacheInterceptor;
Expand Down Expand Up @@ -142,6 +145,28 @@ RealCall get() {
return RealCall.this;
}

/**
* Attempt to enqueue this async call on {@code executorService}. This will attempt to clean up
* if the executor has been shut down by reporting the call as failed.
*/
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
eventListener.callFailed(RealCall.this, ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}

@Override protected void execute() {
boolean signalledCallback = false;
try {
Expand Down

0 comments on commit dfd6c95

Please sign in to comment.