-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
check pending stream completion at delayed transport lifecycle #7720
Changes from 3 commits
ac99972
1fe3bce
eb18861
b1f553a
81ab463
193eb48
9304375
56374dd
5024290
7022f5e
cf2d51b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,6 +66,23 @@ final class DelayedClientTransport implements ManagedClientTransport { | |
@Nonnull | ||
@GuardedBy("lock") | ||
private Collection<PendingStream> pendingStreams = new LinkedHashSet<>(); | ||
@GuardedBy("lock") | ||
private Collection<PendingStream> toCheckCompletionStreams = new LinkedHashSet<>(); | ||
private Runnable pollForStreamTransferCompletion = new Runnable() { | ||
@Override | ||
public void run() { | ||
ArrayList<PendingStream> savedToCheckCompletionStreams; | ||
synchronized (lock) { | ||
savedToCheckCompletionStreams = new ArrayList<>(toCheckCompletionStreams); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't need copy the list, just saving the reference should be sufficient. |
||
if (!toCheckCompletionStreams.isEmpty()) { | ||
toCheckCompletionStreams = Collections.emptyList(); | ||
} | ||
} | ||
for (final PendingStream stream : savedToCheckCompletionStreams) { | ||
stream.awaitStreamTransferCompletion(); | ||
} | ||
} | ||
}; | ||
|
||
/** | ||
* When {@code shutdownStatus != null && !hasPendingStreams()}, then the transport is considered | ||
|
@@ -211,7 +228,7 @@ public void run() { | |
listener.transportShutdown(status); | ||
} | ||
}); | ||
if (!hasPendingStreams() && reportTransportTerminated != null) { | ||
if (!hasPendingStreams() && !hasUncommittedStreams() && reportTransportTerminated != null) { | ||
syncContext.executeLater(reportTransportTerminated); | ||
reportTransportTerminated = null; | ||
} | ||
|
@@ -227,19 +244,27 @@ public void run() { | |
public final void shutdownNow(Status status) { | ||
shutdown(status); | ||
Collection<PendingStream> savedPendingStreams; | ||
Collection<PendingStream> savedToCheckCompletionStreams; | ||
Runnable savedReportTransportTerminated; | ||
synchronized (lock) { | ||
savedPendingStreams = pendingStreams; | ||
savedToCheckCompletionStreams = toCheckCompletionStreams; | ||
savedReportTransportTerminated = reportTransportTerminated; | ||
reportTransportTerminated = null; | ||
if (!pendingStreams.isEmpty()) { | ||
pendingStreams = Collections.emptyList(); | ||
} | ||
if (!toCheckCompletionStreams.isEmpty()) { | ||
toCheckCompletionStreams = Collections.emptyList(); | ||
} | ||
} | ||
if (savedReportTransportTerminated != null) { | ||
for (PendingStream stream : savedPendingStreams) { | ||
stream.cancel(status); | ||
} | ||
for (PendingStream stream : savedToCheckCompletionStreams) { | ||
stream.awaitStreamTransferCompletion(); | ||
} | ||
syncContext.execute(savedReportTransportTerminated); | ||
} | ||
// If savedReportTransportTerminated == null, transportTerminated() has already been called in | ||
|
@@ -252,13 +277,26 @@ public final boolean hasPendingStreams() { | |
} | ||
} | ||
|
||
public final boolean hasUncommittedStreams() { | ||
voidzcy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
synchronized (lock) { | ||
return !toCheckCompletionStreams.isEmpty(); | ||
} | ||
} | ||
|
||
@VisibleForTesting | ||
final int getPendingStreamsCount() { | ||
synchronized (lock) { | ||
return pendingStreams.size(); | ||
} | ||
} | ||
|
||
@VisibleForTesting | ||
final int getUncommittedStreamsCount() { | ||
synchronized (lock) { | ||
return toCheckCompletionStreams.size(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What about using the same name for |
||
} | ||
} | ||
|
||
/** | ||
* Use the picker to try picking a transport for every pending stream, proceed the stream if the | ||
* pick is successful, otherwise keep it pending. | ||
|
@@ -270,48 +308,61 @@ final int getPendingStreamsCount() { | |
* <p>This method <strong>must not</strong> be called concurrently with itself. | ||
*/ | ||
final void reprocess(@Nullable SubchannelPicker picker) { | ||
ArrayList<PendingStream> toProcess; | ||
ArrayList<PendingStream> toCreateRealStream; | ||
ArrayList<PendingStream> toCheckCompletion; | ||
synchronized (lock) { | ||
lastPicker = picker; | ||
lastPickerVersion++; | ||
if (picker == null || !hasPendingStreams()) { | ||
if ((picker == null || !hasPendingStreams()) && !hasUncommittedStreams()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think changing this line is not necessary. Regardless of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This line is because it needs to take care another case: when shutdown() is called but there are still uncommittedStreams, the shutdown path would return, then it will never shutdown. It relied on reprocess() to trigger the termination. (Similar to cancel() takes care of the last item and then trigger termination callback). This is different in shudownNow() which would take care of waiting uncommitted streams and then finalize termination. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should await() during shutdown(), because there is no "next" Another way is introducing an abstract method There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. Indeed it appears that idle timeout is permanently cancelled immediately after shutdown is called so there is no next |
||
return; | ||
} | ||
toProcess = new ArrayList<>(pendingStreams); | ||
toCreateRealStream = new ArrayList<>(pendingStreams); | ||
toCheckCompletion = new ArrayList<>(toCheckCompletionStreams); | ||
} | ||
ArrayList<PendingStream> toRemove = new ArrayList<>(); | ||
ArrayList<PendingStream> newlyCreated = new ArrayList<>(); | ||
|
||
for (final PendingStream stream : toProcess) { | ||
PickResult pickResult = picker.pickSubchannel(stream.args); | ||
CallOptions callOptions = stream.args.getCallOptions(); | ||
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, | ||
callOptions.isWaitForReady()); | ||
if (transport != null) { | ||
Executor executor = defaultAppExecutor; | ||
// createRealStream may be expensive. It will start real streams on the transport. If | ||
// there are pending requests, they will be serialized too, which may be expensive. Since | ||
// we are now on transport thread, we need to offload the work to an executor. | ||
if (callOptions.getExecutor() != null) { | ||
executor = callOptions.getExecutor(); | ||
} | ||
executor.execute(new Runnable() { | ||
|
||
if (picker != null) { | ||
for (final PendingStream stream : toCreateRealStream) { | ||
PickResult pickResult = picker.pickSubchannel(stream.args); | ||
CallOptions callOptions = stream.args.getCallOptions(); | ||
final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, | ||
callOptions.isWaitForReady()); | ||
if (transport != null) { | ||
Executor executor = defaultAppExecutor; | ||
// createRealStream may be expensive. It will start real streams on the transport. If | ||
// there are pending requests, they will be serialized too, which may be expensive. Since | ||
// we are now on transport thread, we need to offload the work to an executor. | ||
if (callOptions.getExecutor() != null) { | ||
executor = callOptions.getExecutor(); | ||
} | ||
executor.execute(new Runnable() { | ||
@Override | ||
public void run() { | ||
stream.createRealStream(transport); | ||
} | ||
}); | ||
toRemove.add(stream); | ||
} // else: stay pending | ||
newlyCreated.add(stream); | ||
} // else: stay pending | ||
} | ||
} | ||
toCheckCompletion.addAll(newlyCreated); | ||
ArrayList<PendingStream> completed = new ArrayList<>(); | ||
for (final PendingStream stream : toCheckCompletion) { | ||
if (stream.isStreamTransferCompleted()) { | ||
completed.add(stream); | ||
} | ||
} | ||
|
||
synchronized (lock) { | ||
// Between this synchronized and the previous one: | ||
// - Streams may have been cancelled, which may turn pendingStreams into emptiness. | ||
// - shutdown() may be called, which may turn pendingStreams into null. | ||
if (!hasPendingStreams()) { | ||
// - shutdownNow() may be called, which may turn pendingStreams into emptiness. | ||
if (!hasPendingStreams() && !hasUncommittedStreams()) { | ||
return; | ||
} | ||
pendingStreams.removeAll(toRemove); | ||
pendingStreams.removeAll(newlyCreated); | ||
toCheckCompletionStreams.addAll(newlyCreated); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would prefer making |
||
toCheckCompletionStreams.removeAll(completed); | ||
// Because delayed transport is long-lived, we take this opportunity to down-size the | ||
// hashmap. | ||
if (pendingStreams.isEmpty()) { | ||
|
@@ -322,9 +373,10 @@ public void run() { | |
// transport starting streams and setting in-use state. During the gap the whole channel's | ||
// in-use state may be false. However, it shouldn't cause spurious switching to idleness | ||
// (which would shutdown the transports and LoadBalancer) because the gap should be shorter | ||
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (1 second). | ||
// than IDLE_MODE_DEFAULT_TIMEOUT_MILLIS (30 millis). | ||
voidzcy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
syncContext.executeLater(reportTransportNotInUse); | ||
if (shutdownStatus != null && reportTransportTerminated != null) { | ||
syncContext.executeLater(pollForStreamTransferCompletion); | ||
syncContext.executeLater(reportTransportTerminated); | ||
reportTransportTerminated = null; | ||
} | ||
|
@@ -367,6 +419,7 @@ public void cancel(Status reason) { | |
if (!hasPendingStreams() && justRemovedAnElement) { | ||
syncContext.executeLater(reportTransportNotInUse); | ||
if (shutdownStatus != null) { | ||
syncContext.executeLater(pollForStreamTransferCompletion); | ||
syncContext.executeLater(reportTransportTerminated); | ||
reportTransportTerminated = null; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,8 @@ | |
import java.io.InputStream; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
import javax.annotation.concurrent.GuardedBy; | ||
|
||
/** | ||
|
@@ -59,6 +61,7 @@ class DelayedStream implements ClientStream { | |
private long startTimeNanos; | ||
@GuardedBy("this") | ||
private long streamSetTimeNanos; | ||
private final CountDownLatch realStreamStarted = new CountDownLatch(1); | ||
|
||
@Override | ||
public void setMaxInboundMessageSize(final int maxSize) { | ||
|
@@ -132,6 +135,24 @@ final void setStream(ClientStream stream) { | |
drainPendingCalls(); | ||
} | ||
|
||
protected boolean isStreamTransferCompleted() { | ||
return realStreamStarted.getCount() == 0; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that Better avoid it in the main source if possible. |
||
} | ||
|
||
protected void awaitStreamTransferCompletion() { | ||
// Wait until accepted RPCs transfer to the real stream so that we can properly cancel or | ||
// shutdown. Not waiting for transfer completion may cause pending calls orphaned. #6283. | ||
boolean delegationComplete; | ||
try { | ||
delegationComplete = realStreamStarted.await(5, TimeUnit.SECONDS); | ||
} catch (InterruptedException ex) { | ||
delegationComplete = false; | ||
} | ||
if (!delegationComplete) { | ||
Thread.currentThread().interrupt(); | ||
voidzcy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} | ||
|
||
/** | ||
* Called to transition {@code passThrough} to {@code true}. This method is not safe to be called | ||
* multiple times; the caller must ensure it will only be called once, ever. {@code this} lock | ||
|
@@ -221,12 +242,14 @@ public void start(ClientStreamListener listener) { | |
|
||
if (savedPassThrough) { | ||
realStream.start(listener); | ||
realStreamStarted.countDown(); | ||
} else { | ||
final ClientStreamListener finalListener = listener; | ||
delayOrExecute(new Runnable() { | ||
@Override | ||
public void run() { | ||
realStream.start(finalListener); | ||
realStreamStarted.countDown(); | ||
} | ||
}); | ||
} | ||
|
@@ -302,7 +325,11 @@ public void run() { | |
listenerToClose.closed(reason, new Metadata()); | ||
} | ||
drainPendingCalls(); | ||
if (!isStreamTransferCompleted()) { | ||
voidzcy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
realStreamStarted.countDown(); | ||
} | ||
} | ||
awaitStreamTransferCompletion(); | ||
voidzcy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
@GuardedBy("this") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it
final
?