Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: fix race condition in streaming connection #2416

Merged
merged 8 commits into from
Oct 4, 2017
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
Expand All @@ -53,14 +55,15 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10);
private static final int MAX_PER_REQUEST_CHANGES = 10000;

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());

private final SubscriberStub asyncStub;

private final String subscription;
private final ScheduledExecutorService executor;
private final MessageDispatcher messageDispatcher;

private final AtomicLong channelReconnectBackoffMillis =
new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());

private final Lock lock = new ReentrantLock();

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

private ClientCallStreamObserver<StreamingPullRequest> requestObserver;

public StreamingSubscriberConnection(
Expand Down Expand Up @@ -104,22 +107,37 @@ protected void doStart() {
@Override
protected void doStop() {
messageDispatcher.stop();
requestObserver.onError(Status.CANCELLED.asException());
notifyStopped();

lock.lock();
try {
requestObserver.onError(Status.CANCELLED.asException());

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

} finally {
lock.unlock();
notifyStopped();
}
}

private class StreamingPullResponseObserver
implements ClientResponseObserver<StreamingPullRequest, StreamingPullResponse> {

final SettableFuture<Void> errorFuture;

/**
* When a batch finsihes processing, we want to request one more batch from the server. But by
* the time this happens, our stream might have already errored, and new stream created. We
* don't want to request more batches from the new stream -- that might pull more messages than
* the user can deal with -- so we save the request observer this response observer is "paired
* with". If the stream has already errored, requesting more messages is a no-op.
*/
ClientCallStreamObserver<StreamingPullRequest> thisRequestObserver;

StreamingPullResponseObserver(SettableFuture<Void> errorFuture) {
this.errorFuture = errorFuture;
}

@Override
public void beforeStart(ClientCallStreamObserver<StreamingPullRequest> requestObserver) {
StreamingSubscriberConnection.this.requestObserver = requestObserver;
thisRequestObserver = requestObserver;
requestObserver.disableAutoInboundFlowControl();
}

Expand All @@ -131,9 +149,18 @@ public void onNext(StreamingPullResponse response) {
new Runnable() {
@Override
public void run() {
// Only if not shutdown we will request one more batches of messages to be delivered.
if (isAlive()) {
requestObserver.request(1);
// Only request more if we're not shutdown.
// If errorFuture is done, the stream has either failed or hung up,
// and we don't need to request.
if (isAlive() && !errorFuture.isDone()) {
lock.lock();

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

try {
thisRequestObserver.request(1);
} catch (Exception e) {
logger.log(Level.WARNING, "cannot request more messages", e);
} finally {
lock.unlock();
}
}
}
});
Expand Down Expand Up @@ -169,6 +196,18 @@ private void initialize() {
.build());
requestObserver.request(1);

/**
* Must make sure we do this after sending the subscription name and deadline. Otherwise, some
* other thread might use this stream to do something else before we could send the first
* request.
*/
lock.lock();
try {
this.requestObserver = requestObserver;
} finally {
lock.unlock();
}

Futures.addCallback(
errorFuture,
new FutureCallback<Void>() {
Expand All @@ -191,24 +230,24 @@ public void onFailure(Throwable cause) {
return;
}
logger.log(Level.WARNING, "Terminated streaming with exception", cause);
if (StatusUtil.isRetryable(cause)) {
long backoffMillis = channelReconnectBackoffMillis.get();
long newBackoffMillis =
Math.min(backoffMillis * 2, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
channelReconnectBackoffMillis.set(newBackoffMillis);

executor.schedule(
new Runnable() {
@Override
public void run() {
initialize();
}
},
backoffMillis,
TimeUnit.MILLISECONDS);
} else {
if (!StatusUtil.isRetryable(cause)) {
notifyFailed(cause);
return;
}
long backoffMillis = channelReconnectBackoffMillis.get();
long newBackoffMillis =
Math.min(backoffMillis * 2, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis());
channelReconnectBackoffMillis.set(newBackoffMillis);

executor.schedule(
new Runnable() {
@Override
public void run() {
initialize();
}
},
backoffMillis,
TimeUnit.MILLISECONDS);
}
},
executor);
Expand All @@ -223,8 +262,15 @@ public void sendAckOperations(
List<String> acksToSend, List<PendingModifyAckDeadline> ackDeadlineExtensions) {
List<StreamingPullRequest> requests =
partitionAckOperations(acksToSend, ackDeadlineExtensions, MAX_PER_REQUEST_CHANGES);
for (StreamingPullRequest request : requests) {
requestObserver.onNext(request);
lock.lock();

This comment was marked as spam.

This comment was marked as spam.

try {
for (StreamingPullRequest request : requests) {
requestObserver.onNext(request);
}
} catch (Exception e) {
logger.log(Level.WARNING, "failed to send acks", e);
} finally {
lock.unlock();
}
}

Expand Down Expand Up @@ -274,9 +320,16 @@ static List<StreamingPullRequest> partitionAckOperations(

public void updateStreamAckDeadline(int newAckDeadlineSeconds) {
messageDispatcher.setMessageDeadlineSeconds(newAckDeadlineSeconds);
requestObserver.onNext(
StreamingPullRequest.newBuilder()
.setStreamAckDeadlineSeconds(newAckDeadlineSeconds)
.build());
lock.lock();

This comment was marked as spam.

This comment was marked as spam.

try {
requestObserver.onNext(
StreamingPullRequest.newBuilder()
.setStreamAckDeadlineSeconds(newAckDeadlineSeconds)
.build());
} catch (Exception e) {
logger.log(Level.WARNING, "failed to set deadline", e);
} finally {
lock.unlock();
}
}
}