From d0e2d03bf2ebaacf35bebe5be907753e967b37b3 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Fri, 5 Apr 2019 16:30:12 -0400 Subject: [PATCH] Change MessageDispatcher to be synchronous instead of asynchronous. This removes the failure mode described in #2452 that can occur when MaxOutstandingElementCount is low and there is more than one connection. In this case, it is possible for an individual MessageDispatcher to have no outstanding in-flight messages, but also be blocked by flow control with a whole new batch outstanding. In this case, it will never make progress on that batch since it will never receive another batch and the queue was made to not be shared in #4590, so the batch will never be pulled off by another MessageDispatcher. By changing this to use a blocking flow controller, this will never happen, as each batch will synchronously wait until it is allowed by flow control before being processed. --- .../cloud/pubsub/v1/MessageDispatcher.java | 95 ++++--------------- .../v1/StreamingSubscriberConnection.java | 34 +++---- .../google/cloud/pubsub/v1/Subscriber.java | 2 +- .../pubsub/v1/MessageDispatcherTest.java | 18 ++-- 4 files changed, 45 insertions(+), 104 deletions(-) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index a851797500a0..acb2cf99952e 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -24,7 +24,6 @@ import com.google.api.gax.batching.FlowController; import com.google.api.gax.batching.FlowController.FlowControlException; import com.google.api.gax.core.Distribution; -import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessageBatch.OutstandingMessage; import com.google.common.primitives.Ints; import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.PubsubMessage; @@ -91,9 +90,6 @@ class MessageDispatcher { private final Lock jobLock; private ScheduledFuture backgroundJob; - private final LinkedBlockingDeque outstandingMessageBatches = - new LinkedBlockingDeque<>(); - // To keep track of number of seconds the receiver takes to process messages. private final Distribution ackLatencyDistribution; @@ -155,7 +151,6 @@ private void forget() { } flowController.release(1, outstandingBytes); messagesWaiter.incrementPendingMessages(-1); - processOutstandingBatches(); } @Override @@ -296,50 +291,19 @@ int getMessageDeadlineSeconds() { return messageDeadlineSeconds.get(); } - static class OutstandingMessageBatch { - private final Deque messages; - private final Runnable doneCallback; - - static class OutstandingMessage { - private final ReceivedMessage receivedMessage; - private final AckHandler ackHandler; - - public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { - this.receivedMessage = receivedMessage; - this.ackHandler = ackHandler; - } - - public ReceivedMessage receivedMessage() { - return receivedMessage; - } - - public AckHandler ackHandler() { - return ackHandler; - } - } + static class OutstandingMessage { + private final ReceivedMessage receivedMessage; + private final AckHandler ackHandler; - public OutstandingMessageBatch(Runnable doneCallback) { - this.messages = new LinkedList<>(); - this.doneCallback = doneCallback; - } - - public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { - this.messages.add(new OutstandingMessage(receivedMessage, ackHandler)); - } - - public Deque messages() { - return messages; + public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { + this.receivedMessage = receivedMessage; + this.ackHandler = ackHandler; } } - public void processReceivedMessages(List messages, Runnable doneCallback) { - if (messages.isEmpty()) { - doneCallback.run(); - return; - } - + public void processReceivedMessages(List messages) { Instant totalExpiration = now().plus(maxAckExtensionPeriod); - OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback); + List outstandingBatch = new ArrayList<>(messages.size()); for (ReceivedMessage message : messages) { AckHandler ackHandler = new AckHandler( @@ -355,42 +319,25 @@ public void processReceivedMessages(List messages, Runnable don // totally expire so that pubsub service sends us the message again. continue; } - outstandingBatch.addMessage(message, ackHandler); + outstandingBatch.add(new OutstandingMessage(message, ackHandler)); pendingReceipts.add(message.getAckId()); } - if (outstandingBatch.messages.isEmpty()) { - doneCallback.run(); - return; - } - - messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size()); - outstandingMessageBatches.add(outstandingBatch); - processOutstandingBatches(); + processBatch(outstandingBatch); } - private void processOutstandingBatches() { - for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll(); - nextBatch != null; - nextBatch = outstandingMessageBatches.poll()) { - for (OutstandingMessage nextMessage = nextBatch.messages.poll(); - nextMessage != null; - nextMessage = nextBatch.messages.poll()) { - try { - // This is a non-blocking flow controller. - flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize()); - } catch (FlowController.MaxOutstandingElementCountReachedException - | FlowController.MaxOutstandingRequestBytesReachedException flowControlException) { - // Unwind previous changes in the batches outstanding. - nextBatch.messages.addFirst(nextMessage); - outstandingMessageBatches.addFirst(nextBatch); - return; - } catch (FlowControlException unexpectedException) { - throw new IllegalStateException("Flow control unexpected exception", unexpectedException); - } - processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler); + private void processBatch(List batch) { + messagesWaiter.incrementPendingMessages(batch.size()); + for (OutstandingMessage message : batch) { + // This is a blocking flow controller. We have already incremented MessageWaiter, so + // shutdown will block on processing of all these messages anyway. + try { + flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize()); + } catch (FlowControlException unexpectedException) { + // This should be a blocking flow controller and never throw an exception. + throw new IllegalStateException("Flow control unexpected exception", unexpectedException); } - nextBatch.doneCallback.run(); + processOutstandingMessage(message.receivedMessage.getMessage(), message.ackHandler); } } diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index ba12e97fb8a1..0f273c3429a8 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -151,26 +151,20 @@ public void onStart(StreamController controller) { @Override public void onResponse(StreamingPullResponse response) { channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis()); - messageDispatcher.processReceivedMessages( - response.getReceivedMessagesList(), - new Runnable() { - @Override - public void run() { - // Only request more if we're not shutdown. - // If errorFuture is done, the stream has either failed or hung up, - // and we don't need to request. - if (isAlive() && !errorFuture.isDone()) { - lock.lock(); - try { - thisController.request(1); - } catch (Exception e) { - logger.log(Level.WARNING, "cannot request more messages", e); - } finally { - lock.unlock(); - } - } - } - }); + messageDispatcher.processReceivedMessages(response.getReceivedMessagesList()); + // Only request more if we're not shutdown. + // If errorFuture is done, the stream has either failed or hung up, + // and we don't need to request. + if (isAlive() && !errorFuture.isDone()) { + lock.lock(); + try { + thisController.request(1); + } catch (Exception e) { + logger.log(Level.WARNING, "cannot request more messages", e); + } finally { + lock.unlock(); + } + } } @Override diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 70d17a4c9884..eb42ad82b7b6 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -130,7 +130,7 @@ private Subscriber(Builder builder) { builder .flowControlSettings .toBuilder() - .setLimitExceededBehavior(LimitExceededBehavior.ThrowException) + .setLimitExceededBehavior(LimitExceededBehavior.Block) .build()); this.numPullers = builder.parallelPullCount; diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 494945e028df..c4ca6e51a023 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -105,7 +105,7 @@ public void sendAckOperations( new FlowController( FlowControlSettings.newBuilder() .setMaxOutstandingElementCount(1L) - .setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException) + .setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block) .build()); dispatcher = @@ -124,7 +124,7 @@ public void sendAckOperations( @Test public void testReceipt() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); dispatcher.processOutstandingAckOperations(); assertThat(sentModAcks) .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); @@ -132,7 +132,7 @@ public void testReceipt() throws Exception { @Test public void testAck() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); consumers.take().ack(); dispatcher.processOutstandingAckOperations(); assertThat(sentAcks).contains(TEST_MESSAGE.getAckId()); @@ -140,7 +140,7 @@ public void testAck() throws Exception { @Test public void testNack() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); consumers.take().nack(); dispatcher.processOutstandingAckOperations(); assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0)); @@ -148,7 +148,7 @@ public void testNack() throws Exception { @Test public void testExtension() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); dispatcher.extendDeadlines(); assertThat(sentModAcks) .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); @@ -161,7 +161,7 @@ public void testExtension() throws Exception { @Test public void testExtension_Close() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); dispatcher.extendDeadlines(); assertThat(sentModAcks) .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); @@ -176,7 +176,7 @@ public void testExtension_Close() throws Exception { @Test public void testExtension_GiveUp() throws Exception { - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); dispatcher.extendDeadlines(); assertThat(sentModAcks) .contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS)); @@ -188,7 +188,7 @@ public void testExtension_GiveUp() throws Exception { dispatcher.extendDeadlines(); assertThat(sentModAcks).isEmpty(); - // We should be able to reserve another item in the flow controller and not block shutdown + // We should be able to reserve another item in the flow controller and not block. flowController.reserve(1, 0); dispatcher.stop(); } @@ -197,7 +197,7 @@ public void testExtension_GiveUp() throws Exception { public void testDeadlineAdjustment() throws Exception { assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(10); - dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE); + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); clock.advance(42, TimeUnit.SECONDS); consumers.take().ack();