Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: fix race condition in streaming connection #2416

Merged
merged 8 commits into from
Oct 4, 2017
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -52,14 +54,15 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10);
private static final int MAX_PER_REQUEST_CHANGES = 10000;

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());

private final SubscriberStub asyncStub;

private final String subscription;
private final ScheduledExecutorService executor;
private final MessageDispatcher messageDispatcher;

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());

private final Lock lock = new ReentrantLock();

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

private ClientCallStreamObserver<StreamingPullRequest> requestObserver;

public StreamingSubscriberConnection(
Expand Down Expand Up @@ -101,23 +104,35 @@ protected void doStart() {
@Override
protected void doStop() {
messageDispatcher.stop();
requestObserver.onError(Status.CANCELLED.asException());
notifyStopped();

lock.lock();
try {
requestObserver.onError(Status.CANCELLED.asException());

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

} finally {
lock.unlock();
notifyStopped();
}
}

private class StreamingPullResponseObserver
implements ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> {

final SettableFuture<Void> errorFuture;
ClientCallStreamObserver<StreamingPullRequest> thisRequestObserver;

StreamingPullResponseObserver(SettableFuture<Void> errorFuture) {
this.errorFuture = errorFuture;
}

@Override
public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> requestObserver) {
StreamingSubscriberConnection.this.requestObserver = requestObserver;
requestObserver.disableAutoInboundFlowControl();
thisRequestObserver = requestObserver;
lock.lock();

This comment was marked as spam.

This comment was marked as spam.

try {
requestObserver.disableAutoInboundFlowControl();
} finally {
lock.unlock();
}
}

@Override
Expand All @@ -128,9 +143,22 @@ public void onNext(StreamingPullResponse response) {
new Runnable() {
@Override
public void run() {
// Only if not shutdown we will request one more batches of messages to be delivered.
/**
* Only if not shutdown we will request one more batches of messages to be delivered.
*
* <p>We use the request observer we're paired with, not the "current observer" in the
* outer class. By the time this Runnable is called, the stream might have already
* failed and the outer class might have created a new stream. In this case, we should
* request more messages from the already-failed stream (a no-op), otherwise the
* current observer might get more messages than it should.
*/
if (isAlive()) {
requestObserver.request(1);
lock.lock();

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

try {
thisRequestObserver.request(1);

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

} finally {
lock.unlock();
}
}
}
});
Expand Down Expand Up @@ -166,6 +194,18 @@ private void initialize() {
.build());
requestObserver.request(1);

/**
* Must make sure we do this after sending the subscription name and deadline. Otherwise, some
* other thread might use this stream to do something else before we could send the first
* request.
*/
lock.lock();
try {
this.requestObserver = requestObserver;
} finally {
lock.unlock();
}

Futures.addCallback(
errorFuture,
new FutureCallback<Void>() {
Expand All @@ -188,24 +228,24 @@ public void onFailure(Throwable cause) {
return;
}
logger.log(Level.WARNING, "Terminated streaming with exception", cause);
if (StatusUtil.isRetryable(cause)) {
long backoffMillis = channelReconnectBackoffMillis.get();
long newBackoffMillis =
Math.min(backoffMillis * 2, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
channelReconnectBackoffMillis.set(newBackoffMillis);

executor.schedule(
new Runnable() {
@Override
public void run() {
initialize();
}
},
backoffMillis,
TimeUnit.MILLISECONDS);
} else {
if (!StatusUtil.isRetryable(cause)) {
notifyFailed(cause);
return;
}
long backoffMillis = channelReconnectBackoffMillis.get();
long newBackoffMillis =
Math.min(backoffMillis * 2, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
channelReconnectBackoffMillis.set(newBackoffMillis);

executor.schedule(
new Runnable() {
@Override
public void run() {
initialize();
}
},
backoffMillis,
TimeUnit.MILLISECONDS);
}
},
executor);
Expand All @@ -220,8 +260,13 @@ public void sendAckOperations(
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions) {
List<StreamingPullRequest> requests =
partitionAckOperations(acksToSend, ackDeadlineExtensions, MAX_PER_REQUEST_CHANGES);
for (StreamingPullRequest request : requests) {
requestObserver.onNext(request);
lock.lock();

This comment was marked as spam.

This comment was marked as spam.

try {
for (StreamingPullRequest request : requests) {
requestObserver.onNext(request);
}
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -271,9 +316,14 @@ static List<StreamingPullRequest> partitionAckOperations(

public void updateStreamAckDeadline(int newAckDeadlineSeconds) {
messageDispatcher.setMessageDeadlineSeconds(newAckDeadlineSeconds);
requestObserver.onNext(
StreamingPullRequest.newBuilder()
.setStreamAckDeadlineSeconds(newAckDeadlineSeconds)
.build());
lock.lock();

This comment was marked as spam.

This comment was marked as spam.

try {
requestObserver.onNext(
StreamingPullRequest.newBuilder()
.setStreamAckDeadlineSeconds(newAckDeadlineSeconds)
.build());
} finally {
lock.unlock();
}
}
}