Skip to content

Commit

Permalink
Change MessageDispatcher to be synchronous instead of asynchronous.
Browse files Browse the repository at this point in the history
This removes the failure mode described in googleapis#2452 that can occur when MaxOutstandingElementCount is low and there is more than one connection.  In this case, it is possible for an individual MessageDispatcher to have no outstanding in-flight messages, but also be blocked by flow control with a whole new batch outstanding. In this case, it will never make progress on that batch since it will never receive another batch and the queue was made to not be shared in  googleapis#4590, so the batch will never be pulled off by another MessageDispatcher.

By changing this to use a blocking flow controller, this will never happen, as each batch will synchronously wait until it is allowed by flow control before being processed.
  • Loading branch information
dpcollins-google committed Apr 5, 2019
1 parent 80924d0 commit d0e2d03
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.batching.FlowController.FlowControlException;
import com.google.api.gax.core.Distribution;
import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessageBatch.OutstandingMessage;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
Expand Down Expand Up @@ -91,9 +90,6 @@ class MessageDispatcher {
private final Lock jobLock;
private ScheduledFuture<?> backgroundJob;

private final LinkedBlockingDeque<OutstandingMessageBatch> outstandingMessageBatches =
new LinkedBlockingDeque<>();

// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;

Expand Down Expand Up @@ -155,7 +151,6 @@ private void forget() {
}
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
}

@Override
Expand Down Expand Up @@ -296,50 +291,19 @@ int getMessageDeadlineSeconds() {
return messageDeadlineSeconds.get();
}

static class OutstandingMessageBatch {
private final Deque<OutstandingMessage> messages;
private final Runnable doneCallback;

static class OutstandingMessage {
private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;

public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.receivedMessage = receivedMessage;
this.ackHandler = ackHandler;
}

public ReceivedMessage receivedMessage() {
return receivedMessage;
}

public AckHandler ackHandler() {
return ackHandler;
}
}
static class OutstandingMessage {
private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;

public OutstandingMessageBatch(Runnable doneCallback) {
this.messages = new LinkedList<>();
this.doneCallback = doneCallback;
}

public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.messages.add(new OutstandingMessage(receivedMessage, ackHandler));
}

public Deque<OutstandingMessage> messages() {
return messages;
public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.receivedMessage = receivedMessage;
this.ackHandler = ackHandler;
}
}

public void processReceivedMessages(List<ReceivedMessage> messages, Runnable doneCallback) {
if (messages.isEmpty()) {
doneCallback.run();
return;
}

public void processReceivedMessages(List<ReceivedMessage> messages) {
Instant totalExpiration = now().plus(maxAckExtensionPeriod);
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
List<OutstandingMessage> outstandingBatch = new ArrayList<>(messages.size());
for (ReceivedMessage message : messages) {
AckHandler ackHandler =
new AckHandler(
Expand All @@ -355,42 +319,25 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
// totally expire so that pubsub service sends us the message again.
continue;
}
outstandingBatch.addMessage(message, ackHandler);
outstandingBatch.add(new OutstandingMessage(message, ackHandler));
pendingReceipts.add(message.getAckId());
}

if (outstandingBatch.messages.isEmpty()) {
doneCallback.run();
return;
}

messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size());
outstandingMessageBatches.add(outstandingBatch);
processOutstandingBatches();
processBatch(outstandingBatch);
}

private void processOutstandingBatches() {
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
nextBatch != null;
nextBatch = outstandingMessageBatches.poll()) {
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
nextMessage != null;
nextMessage = nextBatch.messages.poll()) {
try {
// This is a non-blocking flow controller.
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
} catch (FlowController.MaxOutstandingElementCountReachedException
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
// Unwind previous changes in the batches outstanding.
nextBatch.messages.addFirst(nextMessage);
outstandingMessageBatches.addFirst(nextBatch);
return;
} catch (FlowControlException unexpectedException) {
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
private void processBatch(List<OutstandingMessage> batch) {
messagesWaiter.incrementPendingMessages(batch.size());
for (OutstandingMessage message : batch) {
// This is a blocking flow controller. We have already incremented MessageWaiter, so
// shutdown will block on processing of all these messages anyway.
try {
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
} catch (FlowControlException unexpectedException) {
// This should be a blocking flow controller and never throw an exception.
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
nextBatch.doneCallback.run();
processOutstandingMessage(message.receivedMessage.getMessage(), message.ackHandler);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,20 @@ public void onStart(StreamController controller) {
@Override
public void onResponse(StreamingPullResponse response) {
channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
messageDispatcher.processReceivedMessages(
response.getReceivedMessagesList(),
new Runnable() {
@Override
public void run() {
// Only request more if we're not shutdown.
// If errorFuture is done, the stream has either failed or hung up,
// and we don't need to request.
if (isAlive() && !errorFuture.isDone()) {
lock.lock();
try {
thisController.request(1);
} catch (Exception e) {
logger.log(Level.WARNING, "cannot request more messages", e);
} finally {
lock.unlock();
}
}
}
});
messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
// Only request more if we're not shutdown.
// If errorFuture is done, the stream has either failed or hung up,
// and we don't need to request.
if (isAlive() && !errorFuture.isDone()) {
lock.lock();
try {
thisController.request(1);
} catch (Exception e) {
logger.log(Level.WARNING, "cannot request more messages", e);
} finally {
lock.unlock();
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private Subscriber(Builder builder) {
builder
.flowControlSettings
.toBuilder()
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException)
.setLimitExceededBehavior(LimitExceededBehavior.Block)
.build());

this.numPullers = builder.parallelPullCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void sendAckOperations(
new FlowController(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build());

dispatcher =
Expand All @@ -124,31 +124,31 @@ public void sendAckOperations(

@Test
public void testReceipt() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
dispatcher.processOutstandingAckOperations();
assertThat(sentModAcks)
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
}

@Test
public void testAck() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
consumers.take().ack();
dispatcher.processOutstandingAckOperations();
assertThat(sentAcks).contains(TEST_MESSAGE.getAckId());
}

@Test
public void testNack() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
consumers.take().nack();
dispatcher.processOutstandingAckOperations();
assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0));
}

@Test
public void testExtension() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
dispatcher.extendDeadlines();
assertThat(sentModAcks)
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
Expand All @@ -161,7 +161,7 @@ public void testExtension() throws Exception {

@Test
public void testExtension_Close() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
dispatcher.extendDeadlines();
assertThat(sentModAcks)
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
Expand All @@ -176,7 +176,7 @@ public void testExtension_Close() throws Exception {

@Test
public void testExtension_GiveUp() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
dispatcher.extendDeadlines();
assertThat(sentModAcks)
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
Expand All @@ -188,7 +188,7 @@ public void testExtension_GiveUp() throws Exception {
dispatcher.extendDeadlines();
assertThat(sentModAcks).isEmpty();

// We should be able to reserve another item in the flow controller and not block shutdown
// We should be able to reserve another item in the flow controller and not block.
flowController.reserve(1, 0);
dispatcher.stop();
}
Expand All @@ -197,7 +197,7 @@ public void testExtension_GiveUp() throws Exception {
public void testDeadlineAdjustment() throws Exception {
assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(10);

dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
clock.advance(42, TimeUnit.SECONDS);
consumers.take().ack();

Expand Down

0 comments on commit d0e2d03

Please sign in to comment.