From efcc5158724f69eb28e09911a8e529cd6ba13aae Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Fri, 8 Sep 2017 15:34:18 +1000 Subject: [PATCH 1/5] pubsub: fix race condition in streaming connection --- .../v1/StreamingSubscriberConnection.java | 114 +++++++++++++----- 1 file changed, 82 insertions(+), 32 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index b3a66e3cf39e..54cd83053b5a 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -38,6 +38,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -52,14 +54,15 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private static final Duration MAX_CHANNEL_RECONNECT_BACKOFF = Duration.ofSeconds(10); private static final int MAX_PER_REQUEST_CHANGES = 10000; - private final AtomicLong channelReconnectBackoffMillis = - new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); - private final SubscriberStub asyncStub; - private final String subscription; private final ScheduledExecutorService executor; private final MessageDispatcher messageDispatcher; + + private final AtomicLong channelReconnectBackoffMillis = + new AtomicLong(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); + + private final Lock lock = new ReentrantLock(); private ClientCallStreamObserver requestObserver; public StreamingSubscriberConnection( @@ -101,14 +104,21 @@ protected void doStart() { @Override protected void doStop() { messageDispatcher.stop(); - requestObserver.onError(Status.CANCELLED.asException()); - notifyStopped(); + + lock.lock(); + try { + requestObserver.onError(Status.CANCELLED.asException()); + } finally { + lock.unlock(); + notifyStopped(); + } } private class StreamingPullResponseObserver implements ClientResponseObserver { final SettableFuture errorFuture; + ClientCallStreamObserver thisRequestObserver; StreamingPullResponseObserver(SettableFuture errorFuture) { this.errorFuture = errorFuture; @@ -116,8 +126,13 @@ private class StreamingPullResponseObserver @Override public void beforeStart(ClientCallStreamObserver requestObserver) { - StreamingSubscriberConnection.this.requestObserver = requestObserver; - requestObserver.disableAutoInboundFlowControl(); + thisRequestObserver = requestObserver; + lock.lock(); + try { + requestObserver.disableAutoInboundFlowControl(); + } finally { + lock.unlock(); + } } @Override @@ -128,9 +143,22 @@ public void onNext(StreamingPullResponse response) { new Runnable() { @Override public void run() { - // Only if not shutdown we will request one more batches of messages to be delivered. + /** + * Only if not shutdown we will request one more batches of messages to be delivered. + * + *

We use the request observer we're paired with, not the "current observer" in the + * outer class. By the time this Runnable is called, the stream might have already + * failed and the outer class might have created a new stream. In this case, we should + * request more messages from the already-failed stream (a no-op), otherwise the + * current observer might get more messages than it should. + */ if (isAlive()) { - requestObserver.request(1); + lock.lock(); + try { + thisRequestObserver.request(1); + } finally { + lock.unlock(); + } } } }); @@ -166,6 +194,18 @@ private void initialize() { .build()); requestObserver.request(1); + /** + * Must make sure we do this after sending the subscription name and deadline. Otherwise, some + * other thread might use this stream to do something else before we could send the first + * request. + */ + lock.lock(); + try { + this.requestObserver = requestObserver; + } finally { + lock.unlock(); + } + Futures.addCallback( errorFuture, new FutureCallback() { @@ -188,24 +228,24 @@ public void onFailure(Throwable cause) { return; } logger.log(Level.WARNING, "Terminated streaming with exception", cause); - if (StatusUtil.isRetryable(cause)) { - long backoffMillis = channelReconnectBackoffMillis.get(); - long newBackoffMillis = - Math.min(backoffMillis * 2, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis()); - channelReconnectBackoffMillis.set(newBackoffMillis); - - executor.schedule( - new Runnable() { - @Override - public void run() { - initialize(); - } - }, - backoffMillis, - TimeUnit.MILLISECONDS); - } else { + if (!StatusUtil.isRetryable(cause)) { notifyFailed(cause); + return; } + long backoffMillis = channelReconnectBackoffMillis.get(); + long newBackoffMillis = + Math.min(backoffMillis * 2, MAX_CHANNEL_RECONNECT_BACKOFF.toMillis()); + channelReconnectBackoffMillis.set(newBackoffMillis); + + executor.schedule( + new Runnable() { + @Override + public void run() { + initialize(); + } + }, + backoffMillis, + TimeUnit.MILLISECONDS); } }, executor); @@ -220,8 +260,13 @@ public void sendAckOperations( List acksToSend, List ackDeadlineExtensions) { List requests = partitionAckOperations(acksToSend, ackDeadlineExtensions, MAX_PER_REQUEST_CHANGES); - for (StreamingPullRequest request : requests) { - requestObserver.onNext(request); + lock.lock(); + try { + for (StreamingPullRequest request : requests) { + requestObserver.onNext(request); + } + } finally { + lock.unlock(); } } @@ -271,9 +316,14 @@ static List partitionAckOperations( public void updateStreamAckDeadline(int newAckDeadlineSeconds) { messageDispatcher.setMessageDeadlineSeconds(newAckDeadlineSeconds); - requestObserver.onNext( - StreamingPullRequest.newBuilder() - .setStreamAckDeadlineSeconds(newAckDeadlineSeconds) - .build()); + lock.lock(); + try { + requestObserver.onNext( + StreamingPullRequest.newBuilder() + .setStreamAckDeadlineSeconds(newAckDeadlineSeconds) + .build()); + } finally { + lock.unlock(); + } } } From 4817c18bc55b222e82b7216ec53add37bc6e750b Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Mon, 11 Sep 2017 14:28:28 +1000 Subject: [PATCH 2/5] pr comment --- .../v1/StreamingSubscriberConnection.java | 25 ++++++++----------- 1 file changed, 10 insertions(+), 15 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 54cd83053b5a..43d978e285b0 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -118,6 +118,14 @@ private class StreamingPullResponseObserver implements ClientResponseObserver { final SettableFuture errorFuture; + + /** + * When a batch finsihes processing, we want to request one more batch from the server. But by + * the time this happens, our stream might have already errored, and new stream created. We + * don't want to request more batches from the new stream -- that might pull more messages than + * the user can deal with -- so we save the request observer this response observer is "paired + * with". If the stream has already errored, requesting more messages is a no-op. + */ ClientCallStreamObserver thisRequestObserver; StreamingPullResponseObserver(SettableFuture errorFuture) { @@ -127,12 +135,7 @@ private class StreamingPullResponseObserver @Override public void beforeStart(ClientCallStreamObserver requestObserver) { thisRequestObserver = requestObserver; - lock.lock(); - try { - requestObserver.disableAutoInboundFlowControl(); - } finally { - lock.unlock(); - } + requestObserver.disableAutoInboundFlowControl(); } @Override @@ -143,15 +146,7 @@ public void onNext(StreamingPullResponse response) { new Runnable() { @Override public void run() { - /** - * Only if not shutdown we will request one more batches of messages to be delivered. - * - *

We use the request observer we're paired with, not the "current observer" in the - * outer class. By the time this Runnable is called, the stream might have already - * failed and the outer class might have created a new stream. In this case, we should - * request more messages from the already-failed stream (a no-op), otherwise the - * current observer might get more messages than it should. - */ + // Only request more if we're not shutdown. if (isAlive()) { lock.lock(); try { From 9affeaaf3136d2d3f14c74b855ef0867ff4c7aae Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Wed, 13 Sep 2017 13:35:30 +1000 Subject: [PATCH 3/5] pr comment --- .../cloud/pubsub/v1/StreamingSubscriberConnection.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 43d978e285b0..46b9d1f40afd 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -148,12 +148,7 @@ public void onNext(StreamingPullResponse response) { public void run() { // Only request more if we're not shutdown. if (isAlive()) { - lock.lock(); - try { - thisRequestObserver.request(1); - } finally { - lock.unlock(); - } + thisRequestObserver.request(1); } } }); From 68e071bea543ac09790e047e1ad53db833b3d4cf Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Fri, 15 Sep 2017 13:10:18 +1000 Subject: [PATCH 4/5] wip --- .../pubsub/v1/StreamingSubscriberConnection.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 46b9d1f40afd..12557757d355 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -147,8 +147,17 @@ public void onNext(StreamingPullResponse response) { @Override public void run() { // Only request more if we're not shutdown. - if (isAlive()) { - thisRequestObserver.request(1); + // If errorFuture is done, the stream has either failed or hung up, + // and we don't need to request. + if (isAlive() && !errorFuture.isDone()) { + lock.lock(); + try { + thisRequestObserver.request(1); + } catch (Exception e) { + logger.log(Level.WARNING, "cannot request more messages", e); + } finally { + lock.unlock(); + } } } }); From b88bc1e6b56beb4b31eb79c93c9105cfe1998e70 Mon Sep 17 00:00:00 2001 From: Michael Darakananda Date: Thu, 28 Sep 2017 15:20:38 +1000 Subject: [PATCH 5/5] log errors --- .../google/cloud/pubsub/v1/StreamingSubscriberConnection.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index 0706e90fd54a..552706973ee0 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -267,6 +267,8 @@ public void sendAckOperations( for (StreamingPullRequest request : requests) { requestObserver.onNext(request); } + } catch (Exception e) { + logger.log(Level.WARNING, "failed to send acks", e); } finally { lock.unlock(); } @@ -324,6 +326,8 @@ public void updateStreamAckDeadline(int newAckDeadlineSeconds) { StreamingPullRequest.newBuilder() .setStreamAckDeadlineSeconds(newAckDeadlineSeconds) .build()); + } catch (Exception e) { + logger.log(Level.WARNING, "failed to set deadline", e); } finally { lock.unlock(); }