From a8812093de2ad6ce17693eaa42f2e8360ad5fd8c Mon Sep 17 00:00:00 2001 From: Marco Ziccardi Date: Fri, 3 Jun 2016 17:42:38 +0200 Subject: [PATCH] Implement Iterator pull methods, add javadoc and tests --- .../java/com/google/cloud/pubsub/PubSub.java | 48 ++ .../com/google/cloud/pubsub/PubSubImpl.java | 64 ++- .../cloud/pubsub/spi/DefaultPubSubRpc.java | 6 + .../cloud/pubsub/AckDeadlineRenewerTest.java | 5 + .../google/cloud/pubsub/BaseSystemTest.java | 216 +++++++++ .../google/cloud/pubsub/PubSubImplTest.java | 419 +++++++++++------- 6 files changed, 591 insertions(+), 167 deletions(-) diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java index 428f74b64024..f86817c26345 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSub.java @@ -381,8 +381,56 @@ interface MessageConsumer extends AutoCloseable { */ Future> listSubscriptionsAsync(String topic, ListOption... options); + /** + * Pulls messages from the provided subscription. This method possibly returns no messages if no + * message was available at the time the request was processed by the Pub/Sub service (i.e. the + * system is not allowed to wait until at least one message is available). Pulled messages have + * their acknowledge deadline automatically renewed until they are explicitly consumed using + * {@link Iterator#next()}. + * + *

Example usage of synchronous message pulling: + *

 {@code
+   * Iterator messageIterator = pubsub.pull("subscription", 100);
+   * while (messageIterator.hasNext()) {
+   *   ReceivedMessage message = messageIterator.next();
+   *   // message's acknowledge deadline is no longer automatically renewed. If processing takes
+   *   // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
+   *   doSomething(message);
+   *   message.ack(); // or message.nack()
+   * }}
+ * + * @param subscription the subscription from which to pull messages + * @param maxMessages the maximum number of messages pulled by this method. This method can + * possibly return fewer messages. + * @throws PubSubException upon failure + */ Iterator pull(String subscription, int maxMessages); + /** + * Sends a request for pulling messages from the provided subscription. This method returns a + * {@code Future} object to consume the result. {@link Future#get()} returns a message iterator. + * This method possibly returns no messages if no message was available at the time the request + * was processed by the Pub/Sub service (i.e. the system is not allowed to wait until at least one + * message is available). + * + *

Example usage of asynchronous message pulling: + *

 {@code
+   * Future> future = pubsub.pull("subscription", 100);
+   * // do something while the request gets processed
+   * Iterator messageIterator = future.get();
+   * while (messageIterator.hasNext()) {
+   *   ReceivedMessage message = messageIterator.next();
+   *   // message's acknowledge deadline is no longer automatically renewed. If processing takes
+   *   // long pubsub.modifyAckDeadline(String, String, long, TimeUnit) can be used to extend it.
+   *   doSomething(message);
+   *   message.ack(); // or message.nack()
+   * }}
+ * + * @param subscription the subscription from which to pull messages + * @param maxMessages the maximum number of messages pulled by this method. This method can + * possibly return fewer messages. + * @throws PubSubException upon failure + */ Future> pullAsync(String subscription, int maxMessages); MessageConsumer pullAsync(String subscription, MessageProcessor callback, PullOption... options); diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java index 042f76b5197c..800d26c41ce9 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/PubSubImpl.java @@ -16,9 +16,9 @@ package com.google.cloud.pubsub; +import static com.google.api.client.util.Preconditions.checkArgument; import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_SIZE; import static com.google.cloud.pubsub.PubSub.ListOption.OptionType.PAGE_TOKEN; -import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.util.concurrent.Futures.lazyTransform; import com.google.cloud.AsyncPage; @@ -29,10 +29,12 @@ import com.google.cloud.pubsub.spi.PubSubRpc; import com.google.cloud.pubsub.spi.v1.PublisherApi; import com.google.cloud.pubsub.spi.v1.SubscriberApi; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Uninterruptibles; @@ -52,6 +54,8 @@ import com.google.pubsub.v1.ModifyPushConfigRequest; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; import java.util.Collections; import java.util.Iterator; @@ -64,6 +68,8 @@ class PubSubImpl extends BaseService implements PubSub { private final PubSubRpc rpc; + private final AckDeadlineRenewer ackDeadlineRenewer; + private boolean closed; private static final Function EMPTY_TO_VOID_FUNCTION = new Function() { @Override @@ -78,10 +84,25 @@ public Boolean apply(Empty input) { return input != null; } }; + private static final Function + MESSAGE_TO_ACK_ID_FUNCTION = new Function() { + @Override + public String apply(com.google.pubsub.v1.ReceivedMessage message) { + return message.getAckId(); + } + }; PubSubImpl(PubSubOptions options) { super(options); rpc = options.rpc(); + ackDeadlineRenewer = new AckDeadlineRenewer(this); + } + + @VisibleForTesting + PubSubImpl(PubSubOptions options, AckDeadlineRenewer ackDeadlineRenewer) { + super(options); + rpc = options.rpc(); + this.ackDeadlineRenewer = ackDeadlineRenewer; } private abstract static class BasePageFetcher implements AsyncPageImpl.NextPageFetcher { @@ -445,17 +466,35 @@ public Future> listSubscriptionsAsync(String topic, @Override public Iterator pull(String subscription, int maxMessages) { - // this should set return_immediately to true - return null; + return get(pullAsync(subscription, maxMessages)); } @Override - public Future> pullAsync(String subscription, int maxMessages) { - // though this method can set return_immediately to false (as future can be canceled) I - // suggest to keep it false so sync could delegate to asyc and use the same options - // this method also should use the VTKIT thread-pool to renew ack deadline for non consumed - // messages - return null; + public Future> pullAsync(final String subscription, int maxMessages) { + PullRequest request = PullRequest.newBuilder().setReturnImmediately(true) + .setSubscription(SubscriberApi.formatSubscriptionName(options().projectId(), subscription)) + .setMaxMessages(maxMessages) + .setReturnImmediately(true) + .build(); + Future response = rpc.pull(request); + return lazyTransform(response, new Function>() { + @Override + public Iterator apply(PullResponse pullResponse) { + // Add all received messages to the automatic ack deadline renewer + List ackIds = Lists.transform(pullResponse.getReceivedMessagesList(), + MESSAGE_TO_ACK_ID_FUNCTION); + ackDeadlineRenewer.add(subscription, ackIds); + return Iterators.transform(pullResponse.getReceivedMessagesList().iterator(), + new Function() { + @Override + public ReceivedMessage apply(com.google.pubsub.v1.ReceivedMessage receivedMessage) { + // Remove consumed message from automatic ack deadline renewer + ackDeadlineRenewer.remove(subscription, receivedMessage.getAckId()); + return ReceivedMessage.fromPb(PubSubImpl.this, subscription, receivedMessage); + } + }); + } + }); } @Override @@ -549,6 +588,13 @@ public Future modifyAckDeadlineAsync(String subscription, int deadline, Ti @Override public void close() throws Exception { + if (closed) { + return; + } + closed = true; rpc.close(); + if (ackDeadlineRenewer != null) { + ackDeadlineRenewer.close(); + } } } diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java index 7df6cb2632f5..4ac3837088f8 100644 --- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java +++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/spi/DefaultPubSubRpc.java @@ -73,6 +73,8 @@ public class DefaultPubSubRpc implements PubSubRpc { private final ScheduledExecutorService executor; private final ExecutorFactory executorFactory; + private boolean closed; + private static final class InternalPubSubOptions extends PubSubOptions { private static final long serialVersionUID = -7997372049256706185L; @@ -233,6 +235,10 @@ public Future modify(ModifyPushConfigRequest request) { @Override public void close() throws Exception { + if (closed) { + return; + } + closed = true; subscriberApi.close(); publisherApi.close(); executorFactory.release(executor); diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerTest.java index 07cc70c6e30d..bf912e0a79f7 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/AckDeadlineRenewerTest.java @@ -25,7 +25,9 @@ import org.easymock.IAnswer; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; @@ -47,6 +49,9 @@ public class AckDeadlineRenewerTest { private PubSub pubsub; private AckDeadlineRenewer ackDeadlineRenewer; + @Rule + public Timeout globalTimeout = Timeout.seconds(60); + @Before public void setUp() { pubsub = EasyMock.createStrictMock(PubSub.class); diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java index 3360632b45ce..8eb03a7de073 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/BaseSystemTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import com.google.api.client.util.Lists; import com.google.cloud.AsyncPage; import com.google.cloud.Page; import com.google.common.collect.ImmutableList; @@ -37,6 +38,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; /** * A base class for system tests. This class can be extended to run system tests in different @@ -428,4 +430,218 @@ public void testListSubscriptionsAsync() throws ExecutionException, InterruptedE assertTrue(subscription2.delete()); assertTrue(subscription3.delete()); } + + @Test + public void testPullMessages() { + String topic = formatForTest("test-pull-messages-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-pull-messages-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + Iterator iterator = pubsub().pull(subscription, 2); + assertEquals(message1.payloadAsString(), iterator.next().payloadAsString()); + assertEquals(message2.payloadAsString(), iterator.next().payloadAsString()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testPullMessagesAndAutoRenewDeadline() throws InterruptedException { + String topic = formatForTest("test-pull-messages-and-renew-deadline-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-pull-messages-and-renew-deadline-subscription"); + pubsub().create(SubscriptionInfo.builder(topic, subscription).ackDeadLineSeconds(10).build()); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + // todo(mziccard): use batch publish if #1017 gets fixed, or remove this comment + pubsub().publish(topic, message1); + pubsub().publish(topic, message2); + Iterator iterator = pubsub().pull(subscription, 2); + ReceivedMessage consumedMessage = iterator.next(); + Thread.sleep(15000); + // first message was consumed while second message is still being renewed + Iterator nextIterator = pubsub().pull(subscription, 2); + assertTrue(nextIterator.hasNext()); + ReceivedMessage message = nextIterator.next(); + assertEquals(consumedMessage.payloadAsString(), message.payloadAsString()); + assertFalse(nextIterator.hasNext()); + consumedMessage.ack(); + iterator.next().ack(); + nextIterator = pubsub().pull(subscription, 2); + assertFalse(nextIterator.hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testPullMessagesAndModifyAckDeadline() throws InterruptedException { + String topic = formatForTest("test-pull-messages-and-modify-deadline-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-pull-messages-and-modify-deadline-subscription"); + pubsub().create(SubscriptionInfo.builder(topic, subscription).ackDeadLineSeconds(10).build()); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + // todo(mziccard): use batch publish if #1017 gets fixed, or remove this comment + pubsub().publish(topic, message1); + pubsub().publish(topic, message2); + // Consume all messages and stop ack renewal + List receivedMessages = Lists.newArrayList(pubsub().pull(subscription, 2)); + receivedMessages.get(0).modifyAckDeadline(60, TimeUnit.SECONDS); + Thread.sleep(15000); + // first message was renewed while second message should still be sent on pull requests + Iterator nextIterator = pubsub().pull(subscription, 2); + assertTrue(nextIterator.hasNext()); + ReceivedMessage message = nextIterator.next(); + assertEquals(receivedMessages.get(1).payloadAsString(), message.payloadAsString()); + assertFalse(nextIterator.hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testPullNonExistingSubscription() { + thrown.expect(PubSubException.class); + pubsub().pull(formatForTest("non-existing-subscription"), 2); + } + + @Test + public void testPullMessagesAsync() throws ExecutionException, InterruptedException { + String topic = formatForTest("test-pull-messages-async-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-pull-messages-async-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + List messageIds = pubsub().publish(topic, ImmutableList.of(message1, message2)); + assertEquals(2, messageIds.size()); + Iterator iterator = pubsub().pullAsync(subscription, 2).get(); + assertEquals(message1.payloadAsString(), iterator.next().payloadAsString()); + assertEquals(message2.payloadAsString(), iterator.next().payloadAsString()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testPullAsyncNonExistingSubscription() + throws ExecutionException, InterruptedException { + thrown.expect(ExecutionException.class); + pubsub().pullAsync(formatForTest("non-existing-subscription"), 2).get(); + } + + @Test + public void testAckAndNackOneMessage() { + String topic = formatForTest("test-ack-one-message-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-ack-one-message-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message = Message.of("payload"); + assertNotNull(pubsub().publish(topic, message)); + Iterator receivedMessages = pubsub().pull(subscription, 1); + receivedMessages.next().nack(); + receivedMessages = pubsub().pull(subscription, 1); + receivedMessages.next().ack(); + assertFalse(pubsub().pull(subscription, 1).hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testAckAndNackOneMessageAsync() throws ExecutionException, InterruptedException { + String topic = formatForTest("test-ack-one-message-async-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-ack-one-message-async-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message = Message.of("payload"); + assertNotNull(pubsub().publish(topic, message)); + Iterator receivedMessages = pubsub().pull(subscription, 1); + receivedMessages.next().nackAsync().get(); + receivedMessages = pubsub().pull(subscription, 1); + receivedMessages.next().ackAsync().get(); + assertFalse(pubsub().pull(subscription, 1).hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testAckAndNackMoreMessages() throws ExecutionException, InterruptedException { + String topic = formatForTest("test-ack-more-messages-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-ack-more-messages-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + assertNotNull(pubsub().publish(topic, message1, message2)); + Iterator receivedMessages = pubsub().pull(subscription, 2); + pubsub().nack(subscription, receivedMessages.next().ackId(), receivedMessages.next().ackId()); + receivedMessages = pubsub().pull(subscription, 2); + pubsub().ack(subscription, receivedMessages.next().ackId(), receivedMessages.next().ackId()); + assertFalse(pubsub().pull(subscription, 2).hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testAckAndNackMoreMessagesAsync() throws ExecutionException, InterruptedException { + String topic = formatForTest("test-ack-more-messages-async-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-ack-more-messages-async-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + assertNotNull(pubsub().publish(topic, message1, message2)); + Iterator receivedMessages = pubsub().pull(subscription, 2); + pubsub() + .nackAsync(subscription, receivedMessages.next().ackId(), receivedMessages.next().ackId()) + .get(); + receivedMessages = pubsub().pull(subscription, 2); + pubsub() + .ackAsync(subscription, receivedMessages.next().ackId(), receivedMessages.next().ackId()) + .get(); + assertFalse(pubsub().pull(subscription, 2).hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testAckAndNackMessageList() throws ExecutionException, InterruptedException { + String topic = formatForTest("test-ack-message-list-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-ack-message-list-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + assertNotNull(pubsub().publish(topic, ImmutableList.of(message1, message2))); + Iterator receivedMessages = pubsub().pull(subscription, 2); + pubsub().nack(subscription, + ImmutableList.of(receivedMessages.next().ackId(), receivedMessages.next().ackId())); + receivedMessages = pubsub().pull(subscription, 2); + pubsub().ack(subscription, + ImmutableList.of(receivedMessages.next().ackId(), receivedMessages.next().ackId())); + assertFalse(pubsub().pull(subscription, 2).hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } + + @Test + public void testAckAndNackMessageListAsync() throws ExecutionException, InterruptedException { + String topic = formatForTest("test-ack-message-list-async-topic"); + pubsub().create(TopicInfo.of(topic)); + String subscription = formatForTest("test-ack-message-list-async-subscription"); + pubsub().create(SubscriptionInfo.of(topic, subscription)); + Message message1 = Message.of("payload1"); + Message message2 = Message.of("payload2"); + assertNotNull(pubsub().publish(topic, ImmutableList.of(message1, message2))); + Iterator receivedMessages = pubsub().pull(subscription, 2); + pubsub().nackAsync(subscription, + ImmutableList.of(receivedMessages.next().ackId(), receivedMessages.next().ackId())).get(); + receivedMessages = pubsub().pull(subscription, 2); + pubsub().ackAsync(subscription, + ImmutableList.of(receivedMessages.next().ackId(), receivedMessages.next().ackId())).get(); + assertFalse(pubsub().pull(subscription, 2).hasNext()); + assertTrue(pubsub().deleteSubscription(subscription)); + assertTrue(pubsub().deleteTopic(topic)); + } } diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java index 6084266492f0..7a9fee0d999d 100644 --- a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java +++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/PubSubImplTest.java @@ -52,6 +52,8 @@ import com.google.pubsub.v1.ModifyPushConfigRequest; import com.google.pubsub.v1.PublishRequest; import com.google.pubsub.v1.PublishResponse; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; import org.easymock.EasyMock; import org.junit.After; @@ -60,6 +62,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -93,6 +96,18 @@ public com.google.pubsub.v1.Topic apply(TopicInfo topicInfo) { .ackDeadLineSeconds(42) .pushConfig(PUSH_CONFIG) .build(); + private static final Message MESSAGE1 = Message.of("payload1"); + private static final com.google.pubsub.v1.ReceivedMessage MESSAGE_PB1 = + com.google.pubsub.v1.ReceivedMessage.newBuilder() + .setMessage(MESSAGE1.toPb()) + .setAckId("ackId1") + .build(); + private static final Message MESSAGE2 = Message.of("payload2"); + private static final com.google.pubsub.v1.ReceivedMessage MESSAGE_PB2 = + com.google.pubsub.v1.ReceivedMessage.newBuilder() + .setMessage(MESSAGE2.toPb()) + .setAckId("ackId2") + .build(); private static final Function SUBSCRIPTION_TO_PB_FUNCTION = new Function() { @@ -112,6 +127,7 @@ public String apply(SubscriptionId subscriptionId) { private PubSubOptions options; private PubSubRpcFactory rpcFactoryMock; private PubSubRpc pubsubRpcMock; + private AckDeadlineRenewer renewerMock; private PubSub pubsub; @Rule @@ -121,26 +137,32 @@ public String apply(SubscriptionId subscriptionId) { public void setUp() { rpcFactoryMock = EasyMock.createStrictMock(PubSubRpcFactory.class); pubsubRpcMock = EasyMock.createStrictMock(PubSubRpc.class); - EasyMock.expect(rpcFactoryMock.create(EasyMock.anyObject(PubSubOptions.class))) - .andReturn(pubsubRpcMock).times(1); - options = PubSubOptions.builder() - .projectId(PROJECT) - .serviceRpcFactory(rpcFactoryMock) - .retryParams(RetryParams.noRetries()) - .build(); - EasyMock.replay(rpcFactoryMock, pubsubRpcMock); - EasyMock.reset(pubsubRpcMock); + renewerMock = EasyMock.createStrictMock(AckDeadlineRenewer.class); + options = EasyMock.createMock(PubSubOptions.class); + EasyMock.expect(options.projectId()).andReturn(PROJECT).anyTimes(); + EasyMock.expect(options.rpc()).andReturn(pubsubRpcMock).anyTimes(); + EasyMock.expect(options.retryParams()).andReturn(RetryParams.noRetries()).anyTimes(); + EasyMock.replay(rpcFactoryMock, pubsubRpcMock, renewerMock, options); + EasyMock.reset(pubsubRpcMock, renewerMock); } @After public void tearDown() { - EasyMock.verify(rpcFactoryMock, pubsubRpcMock); + EasyMock.verify(rpcFactoryMock, pubsubRpcMock, renewerMock, options); + } + + private void resetOptionsForList(int pageCount) { + EasyMock.reset(options); + EasyMock.expect(options.projectId()).andReturn(PROJECT).times(pageCount); + EasyMock.expect(options.rpc()).andReturn(pubsubRpcMock).times(pageCount); + EasyMock.expect(options.service()).andReturn(pubsub).times(pageCount); + EasyMock.replay(options); } @Test public void testGetOptions() { - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertSame(options, pubsub.options()); } @@ -149,8 +171,8 @@ public void testCreateTopic() { com.google.pubsub.v1.Topic topicPb = TOPIC_INFO.toPb(PROJECT); Future response = Futures.immediateFuture(topicPb); EasyMock.expect(pubsubRpcMock.create(topicPb)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); Topic topic = pubsub.create(TOPIC_INFO); assertEquals(new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), topic); } @@ -160,8 +182,8 @@ public void testCreateTopicAsync() throws ExecutionException, InterruptedExcepti com.google.pubsub.v1.Topic topicPb = TOPIC_INFO.toPb(PROJECT); Future response = Futures.immediateFuture(topicPb); EasyMock.expect(pubsubRpcMock.create(topicPb)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); Topic topic = pubsub.createAsync(TOPIC_INFO).get(); assertEquals(new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), topic); } @@ -172,8 +194,8 @@ public void testGetTopic() { Future response = Futures.immediateFuture(TOPIC_INFO.toPb(PROJECT)); EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); Topic topic = pubsub.getTopic(TOPIC); assertEquals(new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), topic); } @@ -183,8 +205,8 @@ public void testGetTopic_Null() { GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); Future responseFuture = Futures.immediateFuture(null); EasyMock.expect(pubsubRpcMock.get(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertNull(pubsub.getTopic(TOPIC)); } @@ -194,8 +216,8 @@ public void testGetTopicAsync() throws ExecutionException, InterruptedException Future response = Futures.immediateFuture(TOPIC_INFO.toPb(PROJECT)); EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); Future topicFuture = pubsub.getTopicAsync(TOPIC); assertEquals(new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), topicFuture.get()); } @@ -205,8 +227,8 @@ public void testGetTopicAsync_Null() throws ExecutionException, InterruptedExcep GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); Future responseFuture = Futures.immediateFuture(null); EasyMock.expect(pubsubRpcMock.get(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertNull(pubsub.getTopicAsync(TOPIC).get()); } @@ -215,8 +237,8 @@ public void testDeleteTopic() { DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertTrue(pubsub.deleteTopic(TOPIC)); } @@ -225,8 +247,8 @@ public void testDeleteTopic_Null() { DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); Future response = Futures.immediateFuture(null); EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertFalse(pubsub.deleteTopic(TOPIC)); } @@ -235,8 +257,8 @@ public void testDeleteTopicAsync() throws ExecutionException, InterruptedExcepti DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertTrue(pubsub.deleteTopicAsync(TOPIC).get()); } @@ -245,15 +267,16 @@ public void testDeleteTopicAsync_Null() throws ExecutionException, InterruptedEx DeleteTopicRequest request = DeleteTopicRequest.newBuilder().setTopic(TOPIC_NAME_PB).build(); Future response = Futures.immediateFuture(null); EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertFalse(pubsub.deleteTopicAsync(TOPIC).get()); } @Test public void testListTopics() { String cursor = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(1); ListTopicsRequest request = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build(); List topicList = ImmutableList.of( new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), @@ -264,7 +287,7 @@ public void testListTopics() { .build(); Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Page page = pubsub.listTopics(); assertEquals(cursor, page.nextPageCursor()); assertArrayEquals(topicList.toArray(), Iterables.toArray(page.values(), Topic.class)); @@ -273,7 +296,8 @@ public void testListTopics() { @Test public void testListTopicsNextPage() { String cursor1 = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(2); ListTopicsRequest request1 = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build(); ListTopicsRequest request2 = ListTopicsRequest.newBuilder() .setProject(PROJECT_PB) @@ -297,7 +321,7 @@ public void testListTopicsNextPage() { Future futureResponse2 = Futures.immediateFuture(response2); EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1); EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Page page = pubsub.listTopics(); assertEquals(cursor1, page.nextPageCursor()); assertArrayEquals(topicList1.toArray(), Iterables.toArray(page.values(), Topic.class)); @@ -308,7 +332,8 @@ public void testListTopicsNextPage() { @Test public void testListTopicsEmpty() { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(1); ListTopicsRequest request = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build(); List topicList = ImmutableList.of(); ListTopicsResponse response = ListTopicsResponse.newBuilder() @@ -317,7 +342,7 @@ public void testListTopicsEmpty() { .build(); Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Page page = pubsub.listTopics(); assertNull(page.nextPageCursor()); assertNull(page.nextPage()); @@ -327,7 +352,8 @@ public void testListTopicsEmpty() { @Test public void testListTopicsWithOptions() { String cursor = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(1); ListTopicsRequest request = ListTopicsRequest.newBuilder() .setProject(PROJECT_PB) .setPageSize(42) @@ -342,7 +368,7 @@ public void testListTopicsWithOptions() { .build(); Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Page page = pubsub.listTopics(ListOption.pageSize(42), ListOption.pageToken(cursor)); assertNull(page.nextPageCursor()); assertNull(page.nextPage()); @@ -352,7 +378,8 @@ public void testListTopicsWithOptions() { @Test public void testListTopicsAsync() throws ExecutionException, InterruptedException { String cursor = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(1); ListTopicsRequest request = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build(); List topicList = ImmutableList.of( new Topic(pubsub, new TopicInfo.BuilderImpl(TOPIC_INFO)), @@ -363,7 +390,7 @@ public void testListTopicsAsync() throws ExecutionException, InterruptedExceptio .build(); Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); AsyncPage page = pubsub.listTopicsAsync().get(); assertEquals(cursor, page.nextPageCursor()); assertArrayEquals(topicList.toArray(), Iterables.toArray(page.values(), Topic.class)); @@ -372,7 +399,8 @@ public void testListTopicsAsync() throws ExecutionException, InterruptedExceptio @Test public void testListTopicsAsyncNextPage() throws ExecutionException, InterruptedException { String cursor1 = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(2); ListTopicsRequest request1 = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build(); ListTopicsRequest request2 = ListTopicsRequest.newBuilder() .setProject(PROJECT_PB) @@ -396,7 +424,7 @@ public void testListTopicsAsyncNextPage() throws ExecutionException, Interrupted Future futureResponse2 = Futures.immediateFuture(response2); EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1); EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); AsyncPage page = pubsub.listTopicsAsync().get(); assertEquals(cursor1, page.nextPageCursor()); assertArrayEquals(topicList1.toArray(), Iterables.toArray(page.values(), Topic.class)); @@ -407,7 +435,8 @@ public void testListTopicsAsyncNextPage() throws ExecutionException, Interrupted @Test public void testListTopicsAsyncEmpty() throws ExecutionException, InterruptedException { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(1); ListTopicsRequest request = ListTopicsRequest.newBuilder().setProject(PROJECT_PB).build(); List topicList = ImmutableList.of(); ListTopicsResponse response = ListTopicsResponse.newBuilder() @@ -416,7 +445,7 @@ public void testListTopicsAsyncEmpty() throws ExecutionException, InterruptedExc .build(); Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); AsyncPage page = pubsub.listTopicsAsync().get(); assertNull(page.nextPageCursor()); assertNull(page.nextPageAsync().get()); @@ -427,7 +456,8 @@ public void testListTopicsAsyncEmpty() throws ExecutionException, InterruptedExc @Test public void testListTopicsAsyncWithOptions() throws ExecutionException, InterruptedException { String cursor = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(1); ListTopicsRequest request = ListTopicsRequest.newBuilder() .setProject(PROJECT_PB) .setPageSize(42) @@ -442,7 +472,7 @@ public void testListTopicsAsyncWithOptions() throws ExecutionException, Interrup .build(); Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); AsyncPage page = pubsub.listTopicsAsync(ListOption.pageSize(42), ListOption.pageToken(cursor)).get(); assertNull(page.nextPageCursor()); @@ -460,8 +490,8 @@ public void testPublishOneMessage() { PublishResponse response = PublishResponse.newBuilder().addMessageIds(messageId).build(); Future responseFuture = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertEquals(messageId, pubsub.publish(TOPIC, MESSAGE)); } @@ -475,8 +505,8 @@ public void testPublishOneMessageAsync() throws ExecutionException, InterruptedE PublishResponse response = PublishResponse.newBuilder().addMessageIds(messageId).build(); Future responseFuture = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertEquals(messageId, pubsub.publishAsync(TOPIC, MESSAGE).get()); } @@ -492,8 +522,8 @@ public void testPublishMoreMessages() { .build(); Future responseFuture = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertEquals(messageIds, pubsub.publish(TOPIC, MESSAGE, MESSAGE)); } @@ -509,8 +539,8 @@ public void testPublishMoreMessagesAsync() throws ExecutionException, Interrupte .build(); Future responseFuture = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertEquals(messageIds, pubsub.publishAsync(TOPIC, MESSAGE, MESSAGE).get()); } @@ -526,8 +556,8 @@ public void testPublishMessageList() { .build(); Future responseFuture = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertEquals(messageIds, pubsub.publish(TOPIC, ImmutableList.of(MESSAGE, MESSAGE))); } @@ -543,8 +573,8 @@ public void testPublishMessageListAsync() throws ExecutionException, Interrupted .build(); Future responseFuture = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.publish(request)).andReturn(responseFuture); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertEquals(messageIds, pubsub.publishAsync(TOPIC, ImmutableList.of(MESSAGE, MESSAGE)).get()); } @@ -554,8 +584,8 @@ public void testCreateSubscription() { Future response = Futures.immediateFuture(subscriptionPb); EasyMock.expect(pubsubRpcMock.create(subscriptionPb)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); Subscription subscription = pubsub.create(SUBSCRIPTION_INFO); assertEquals( new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), @@ -568,8 +598,8 @@ public void testCreateSubscriptionAsync() throws ExecutionException, Interrupted Future response = Futures.immediateFuture(subscriptionPb); EasyMock.expect(pubsubRpcMock.create(subscriptionPb)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); Subscription subscription = pubsub.createAsync(SUBSCRIPTION_INFO).get(); assertEquals( new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), @@ -583,8 +613,8 @@ public void testGetSubscription() { Future response = Futures.immediateFuture(SUBSCRIPTION_INFO.toPb(PROJECT)); EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); Subscription subscription = pubsub.getSubscription(SUBSCRIPTION); assertEquals( new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), @@ -597,8 +627,8 @@ public void testGetSubscription_Null() { GetSubscriptionRequest.newBuilder().setSubscription(SUBSCRIPTION_NAME_PB).build(); Future response = Futures.immediateFuture(null); EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertNull(pubsub.getSubscription(SUBSCRIPTION)); } @@ -609,8 +639,8 @@ public void testGetSubscriptionAsync() throws ExecutionException, InterruptedExc Future response = Futures.immediateFuture(SUBSCRIPTION_INFO.toPb(PROJECT)); EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); Subscription subscription = pubsub.getSubscriptionAsync(SUBSCRIPTION).get(); assertEquals( new Subscription(pubsub, new SubscriptionInfo.BuilderImpl(COMPLETE_SUBSCRIPTION_INFO)), @@ -623,8 +653,8 @@ public void testGetSubscriptionAsync_Null() throws ExecutionException, Interrupt GetSubscriptionRequest.newBuilder().setSubscription(SUBSCRIPTION_NAME_PB).build(); Future response = Futures.immediateFuture(null); EasyMock.expect(pubsubRpcMock.get(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertNull(pubsub.getSubscriptionAsync(SUBSCRIPTION).get()); } @@ -635,8 +665,8 @@ public void testDeleteSubscription() { .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertTrue(pubsub.deleteSubscription(SUBSCRIPTION)); } @@ -647,8 +677,8 @@ public void testDeleteSubscription_Null() { .build(); Future response = Futures.immediateFuture(null); EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertFalse(pubsub.deleteSubscription(SUBSCRIPTION)); } @@ -659,8 +689,8 @@ public void testDeleteSubscriptionAsync() throws ExecutionException, Interrupted .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertTrue(pubsub.deleteSubscriptionAsync(SUBSCRIPTION).get()); } @@ -671,8 +701,8 @@ public void testDeleteSubscriptionAsync_Null() throws ExecutionException, Interr .build(); Future response = Futures.immediateFuture(null); EasyMock.expect(pubsubRpcMock.delete(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); assertFalse(pubsub.deleteSubscriptionAsync(SUBSCRIPTION).get()); } @@ -684,8 +714,8 @@ public void testReplacePushConfig() { .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); pubsub.replacePushConfig(SUBSCRIPTION, PUSH_CONFIG); } @@ -697,8 +727,8 @@ public void testReplacePushConfig_Null() { .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); pubsub.replacePushConfig(SUBSCRIPTION, null); } @@ -710,8 +740,8 @@ public void testReplacePushConfigAsync() throws ExecutionException, InterruptedE .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); pubsub.replacePushConfigAsync(SUBSCRIPTION, PUSH_CONFIG).get(); } @@ -723,15 +753,16 @@ public void testReplacePushConfigAsync_Null() throws ExecutionException, Interru .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); - pubsub = options.service(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub = new PubSubImpl(options, renewerMock); pubsub.replacePushConfigAsync(SUBSCRIPTION, null).get(); } @Test public void testListSubscriptions() { String cursor = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(1); ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder() .setProject(PROJECT_PB) .build(); @@ -744,7 +775,7 @@ public void testListSubscriptions() { .build(); Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Page page = pubsub.listSubscriptions(); assertEquals(cursor, page.nextPageCursor()); assertArrayEquals(subscriptionList.toArray(), @@ -754,7 +785,8 @@ public void testListSubscriptions() { @Test public void testListSubscriptionsNextPage() { String cursor1 = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(2); ListSubscriptionsRequest request1 = ListSubscriptionsRequest.newBuilder() .setProject(PROJECT_PB) .build(); @@ -780,7 +812,7 @@ public void testListSubscriptionsNextPage() { Future futureResponse2 = Futures.immediateFuture(response2); EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1); EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Page page = pubsub.listSubscriptions(); assertEquals(cursor1, page.nextPageCursor()); assertArrayEquals(subscriptionList1.toArray(), @@ -793,7 +825,8 @@ public void testListSubscriptionsNextPage() { @Test public void testListSubscriptionsEmpty() { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(1); ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder() .setProject(PROJECT_PB) .build(); @@ -804,7 +837,7 @@ public void testListSubscriptionsEmpty() { .build(); Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Page page = pubsub.listSubscriptions(); assertNull(page.nextPageCursor()); assertNull(page.nextPage()); @@ -815,7 +848,8 @@ public void testListSubscriptionsEmpty() { @Test public void testListSubscriptionsWithOptions() { String cursor = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(1); ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder() .setProject(PROJECT_PB) .setPageSize(42) @@ -830,7 +864,7 @@ public void testListSubscriptionsWithOptions() { .build(); Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Page page = pubsub.listSubscriptions(ListOption.pageSize(42), ListOption.pageToken(cursor)); assertNull(page.nextPageCursor()); @@ -842,7 +876,8 @@ public void testListSubscriptionsWithOptions() { @Test public void testListSubscriptionsAsync() throws ExecutionException, InterruptedException { String cursor = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(1); ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder() .setProject(PROJECT_PB) .build(); @@ -855,7 +890,7 @@ public void testListSubscriptionsAsync() throws ExecutionException, InterruptedE .build(); Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); AsyncPage page = pubsub.listSubscriptionsAsync().get(); assertEquals(cursor, page.nextPageCursor()); assertArrayEquals(subscriptionList.toArray(), @@ -865,7 +900,8 @@ public void testListSubscriptionsAsync() throws ExecutionException, InterruptedE @Test public void testListSubscriptionsAsyncNextPage() throws ExecutionException, InterruptedException { String cursor1 = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(2); ListSubscriptionsRequest request1 = ListSubscriptionsRequest.newBuilder() .setProject(PROJECT_PB) .build(); @@ -891,7 +927,7 @@ public void testListSubscriptionsAsyncNextPage() throws ExecutionException, Inte Future futureResponse2 = Futures.immediateFuture(response2); EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1); EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); AsyncPage page = pubsub.listSubscriptionsAsync().get(); assertEquals(cursor1, page.nextPageCursor()); assertArrayEquals(subscriptionList1.toArray(), @@ -904,7 +940,8 @@ public void testListSubscriptionsAsyncNextPage() throws ExecutionException, Inte @Test public void testListSubscriptionsAsyncEmpty() throws ExecutionException, InterruptedException { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(1); ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder() .setProject(PROJECT_PB) .build(); @@ -915,7 +952,7 @@ public void testListSubscriptionsAsyncEmpty() throws ExecutionException, Interru .build(); Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); AsyncPage page = pubsub.listSubscriptionsAsync().get(); assertNull(page.nextPageCursor()); assertNull(page.nextPageAsync().get()); @@ -928,7 +965,8 @@ public void testListSubscriptionsAsyncEmpty() throws ExecutionException, Interru public void testListSubscriptionsAsyncWithOptions() throws ExecutionException, InterruptedException { String cursor = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); + resetOptionsForList(1); ListSubscriptionsRequest request = ListSubscriptionsRequest.newBuilder() .setProject(PROJECT_PB) .setPageSize(42) @@ -943,7 +981,7 @@ public void testListSubscriptionsAsyncWithOptions() .build(); Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); AsyncPage page = pubsub.listSubscriptionsAsync(ListOption.pageSize(42), ListOption.pageToken(cursor)).get(); assertNull(page.nextPageCursor()); @@ -956,7 +994,7 @@ public void testListSubscriptionsAsyncWithOptions() @Test public void testListTopicSubscriptions() { String cursor = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder() .setTopic(TOPIC_NAME_PB) .build(); @@ -970,7 +1008,7 @@ public void testListTopicSubscriptions() { Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Page page = pubsub.listSubscriptions(TOPIC); assertEquals(cursor, page.nextPageCursor()); assertArrayEquals(subscriptionList.toArray(), @@ -980,7 +1018,7 @@ public void testListTopicSubscriptions() { @Test public void testListTopicSubscriptionsNextPage() { String cursor1 = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ListTopicSubscriptionsRequest request1 = ListTopicSubscriptionsRequest.newBuilder() .setTopic(TOPIC_NAME_PB) .build(); @@ -1008,7 +1046,7 @@ public void testListTopicSubscriptionsNextPage() { Futures.immediateFuture(response2); EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1); EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Page page = pubsub.listSubscriptions(TOPIC); assertEquals(cursor1, page.nextPageCursor()); assertArrayEquals(subscriptionList1.toArray(), @@ -1021,7 +1059,7 @@ public void testListTopicSubscriptionsNextPage() { @Test public void testListTopicSubscriptionsEmpty() { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder() .setTopic(TOPIC_NAME_PB) .build(); @@ -1033,7 +1071,7 @@ public void testListTopicSubscriptionsEmpty() { Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Page page = pubsub.listSubscriptions(TOPIC); assertNull(page.nextPageCursor()); assertNull(page.nextPage()); @@ -1044,7 +1082,7 @@ public void testListTopicSubscriptionsEmpty() { @Test public void testListTopicSubscriptionsWithOptions() { String cursor = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder() .setTopic(TOPIC_NAME_PB) .setPageSize(42) @@ -1060,7 +1098,7 @@ public void testListTopicSubscriptionsWithOptions() { Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Page page = pubsub.listSubscriptions(TOPIC, ListOption.pageSize(42), ListOption.pageToken(cursor)); assertNull(page.nextPageCursor()); @@ -1072,7 +1110,7 @@ public void testListTopicSubscriptionsWithOptions() { @Test public void testListTopicSubscriptionsAsync() throws ExecutionException, InterruptedException { String cursor = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder() .setTopic(TOPIC_NAME_PB) .build(); @@ -1086,7 +1124,7 @@ public void testListTopicSubscriptionsAsync() throws ExecutionException, Interru Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); AsyncPage page = pubsub.listSubscriptionsAsync(TOPIC).get(); assertEquals(cursor, page.nextPageCursor()); assertArrayEquals(subscriptionList.toArray(), @@ -1097,7 +1135,7 @@ public void testListTopicSubscriptionsAsync() throws ExecutionException, Interru public void testListTopicSubscriptionsAsyncNextPage() throws ExecutionException, InterruptedException { String cursor1 = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ListTopicSubscriptionsRequest request1 = ListTopicSubscriptionsRequest.newBuilder() .setTopic(TOPIC_NAME_PB) .build(); @@ -1125,7 +1163,7 @@ public void testListTopicSubscriptionsAsyncNextPage() Futures.immediateFuture(response2); EasyMock.expect(pubsubRpcMock.list(request1)).andReturn(futureResponse1); EasyMock.expect(pubsubRpcMock.list(request2)).andReturn(futureResponse2); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); AsyncPage page = pubsub.listSubscriptionsAsync(TOPIC).get(); assertEquals(cursor1, page.nextPageCursor()); assertArrayEquals(subscriptionList1.toArray(), @@ -1139,7 +1177,7 @@ public void testListTopicSubscriptionsAsyncNextPage() @Test public void testListTopicSubscriptionsAsyncEmpty() throws ExecutionException, InterruptedException { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder() .setTopic(TOPIC_NAME_PB) .build(); @@ -1151,7 +1189,7 @@ public void testListTopicSubscriptionsAsyncEmpty() Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); AsyncPage page = pubsub.listSubscriptionsAsync(TOPIC).get(); assertNull(page.nextPageCursor()); assertNull(page.nextPage()); @@ -1164,7 +1202,7 @@ public void testListTopicSubscriptionsAsyncEmpty() public void testListTopicSubscriptionsAsyncWithOptions() throws ExecutionException, InterruptedException { String cursor = "cursor"; - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ListTopicSubscriptionsRequest request = ListTopicSubscriptionsRequest.newBuilder() .setTopic(TOPIC_NAME_PB) .setPageSize(42) @@ -1180,7 +1218,7 @@ public void testListTopicSubscriptionsAsyncWithOptions() Future futureResponse = Futures.immediateFuture(response); EasyMock.expect(pubsubRpcMock.list(request)).andReturn(futureResponse); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); AsyncPage page = pubsub.listSubscriptionsAsync( TOPIC, ListOption.pageSize(42), ListOption.pageToken(cursor)).get(); assertNull(page.nextPageCursor()); @@ -1190,63 +1228,123 @@ public void testListTopicSubscriptionsAsyncWithOptions() Iterables.toArray(page.values(), SubscriptionId.class)); } + @Test + public void testPullMessages() { + pubsub = new PubSubImpl(options, renewerMock); + PullRequest request = PullRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setMaxMessages(42) + .setReturnImmediately(true) + .build(); + List messageList = ImmutableList.of( + ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, MESSAGE_PB1), + ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, MESSAGE_PB2)); + PullResponse response = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE_PB1) + .addReceivedMessages(MESSAGE_PB2) + .build(); + EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(Futures.immediateFuture(response)); + renewerMock.add(SUBSCRIPTION, ImmutableList.of("ackId1", "ackId2")); + EasyMock.replay(pubsubRpcMock, renewerMock); + Iterator messageIterator = pubsub.pull(SUBSCRIPTION, 42); + EasyMock.reset(renewerMock); + for (ReceivedMessage message : messageList) { + renewerMock.remove(SUBSCRIPTION, message.ackId()); + EasyMock.expectLastCall(); + } + EasyMock.replay(renewerMock); + while (messageIterator.hasNext()) { + messageIterator.next(); + } + } + + @Test + public void testPullMessagesAsync() throws ExecutionException, InterruptedException { + pubsub = new PubSubImpl(options, renewerMock); + PullRequest request = PullRequest.newBuilder() + .setSubscription(SUBSCRIPTION_NAME_PB) + .setMaxMessages(42) + .setReturnImmediately(true) + .build(); + List messageList = ImmutableList.of( + ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, MESSAGE_PB1), + ReceivedMessage.fromPb(pubsub, SUBSCRIPTION, MESSAGE_PB2)); + PullResponse response = PullResponse.newBuilder() + .addReceivedMessages(MESSAGE_PB1) + .addReceivedMessages(MESSAGE_PB2) + .build(); + EasyMock.expect(pubsubRpcMock.pull(request)).andReturn(Futures.immediateFuture(response)); + renewerMock.add(SUBSCRIPTION, ImmutableList.of("ackId1", "ackId2")); + EasyMock.replay(pubsubRpcMock, renewerMock); + Iterator messageIterator = pubsub.pullAsync(SUBSCRIPTION, 42).get(); + EasyMock.reset(renewerMock); + for (ReceivedMessage message : messageList) { + renewerMock.remove(SUBSCRIPTION, message.ackId()); + EasyMock.expectLastCall(); + } + EasyMock.replay(renewerMock); + while (messageIterator.hasNext()) { + messageIterator.next(); + } + } + @Test public void testAckOneMessage() { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); AcknowledgeRequest request = AcknowledgeRequest.newBuilder() .setSubscription(SUBSCRIPTION_NAME_PB) .addAckIds("ackId") .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); pubsub.ack(SUBSCRIPTION, "ackId"); } @Test public void testAckOneMessageAsync() throws ExecutionException, InterruptedException { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); AcknowledgeRequest request = AcknowledgeRequest.newBuilder() .setSubscription(SUBSCRIPTION_NAME_PB) .addAckIds("ackId") .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Future future = pubsub.ackAsync(SUBSCRIPTION, "ackId"); assertNull(future.get()); } @Test public void testAckMoreMessages() { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); AcknowledgeRequest request = AcknowledgeRequest.newBuilder() .setSubscription(SUBSCRIPTION_NAME_PB) .addAllAckIds(ImmutableList.of("ackId1", "ackId2")) .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); pubsub.ack(SUBSCRIPTION, "ackId1", "ackId2"); } @Test public void testAckMoreMessagesAsync() throws ExecutionException, InterruptedException { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); AcknowledgeRequest request = AcknowledgeRequest.newBuilder() .setSubscription(SUBSCRIPTION_NAME_PB) .addAllAckIds(ImmutableList.of("ackId1", "ackId2")) .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Future future = pubsub.ackAsync(SUBSCRIPTION, "ackId1", "ackId2"); assertNull(future.get()); } @Test public void testAckMessageList() { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); List ackIds = ImmutableList.of("ackId1", "ackId2"); AcknowledgeRequest request = AcknowledgeRequest.newBuilder() .setSubscription(SUBSCRIPTION_NAME_PB) @@ -1254,13 +1352,13 @@ public void testAckMessageList() { .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); pubsub.ack(SUBSCRIPTION, ackIds); } @Test public void testAckMessageListAsync() throws ExecutionException, InterruptedException { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); List ackIds = ImmutableList.of("ackId1", "ackId2"); AcknowledgeRequest request = AcknowledgeRequest.newBuilder() .setSubscription(SUBSCRIPTION_NAME_PB) @@ -1268,14 +1366,14 @@ public void testAckMessageListAsync() throws ExecutionException, InterruptedExce .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.acknowledge(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Future future = pubsub.ackAsync(SUBSCRIPTION, ackIds); assertNull(future.get()); } @Test public void testNackOneMessage() { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() .setAckDeadlineSeconds(0) .setSubscription(SUBSCRIPTION_NAME_PB) @@ -1283,13 +1381,13 @@ public void testNackOneMessage() { .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); pubsub.nack(SUBSCRIPTION, "ackId"); } @Test public void testNackOneMessageAsync() throws ExecutionException, InterruptedException { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() .setAckDeadlineSeconds(0) .setSubscription(SUBSCRIPTION_NAME_PB) @@ -1297,14 +1395,14 @@ public void testNackOneMessageAsync() throws ExecutionException, InterruptedExce .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Future future = pubsub.nackAsync(SUBSCRIPTION, "ackId"); assertNull(future.get()); } @Test public void testNackMoreMessages() { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() .setAckDeadlineSeconds(0) .setSubscription(SUBSCRIPTION_NAME_PB) @@ -1312,13 +1410,13 @@ public void testNackMoreMessages() { .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); pubsub.nack(SUBSCRIPTION, "ackId1", "ackId2"); } @Test public void testNackMoreMessagesAsync() throws ExecutionException, InterruptedException { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() .setAckDeadlineSeconds(0) .setSubscription(SUBSCRIPTION_NAME_PB) @@ -1326,14 +1424,14 @@ public void testNackMoreMessagesAsync() throws ExecutionException, InterruptedEx .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Future future = pubsub.nackAsync(SUBSCRIPTION, "ackId1", "ackId2"); assertNull(future.get()); } @Test public void testNackMessageList() { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); List ackIds = ImmutableList.of("ackId1", "ackId2"); ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() .setAckDeadlineSeconds(0) @@ -1342,13 +1440,13 @@ public void testNackMessageList() { .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); pubsub.nack(SUBSCRIPTION, ackIds); } @Test public void testNackMessageListAsync() throws ExecutionException, InterruptedException { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); List ackIds = ImmutableList.of("ackId1", "ackId2"); ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() .setAckDeadlineSeconds(0) @@ -1357,14 +1455,14 @@ public void testNackMessageListAsync() throws ExecutionException, InterruptedExc .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Future future = pubsub.nackAsync(SUBSCRIPTION, ackIds); assertNull(future.get()); } @Test public void testModifyAckDeadlineOneMessage() { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() .setAckDeadlineSeconds(10) .setSubscription(SUBSCRIPTION_NAME_PB) @@ -1372,14 +1470,14 @@ public void testModifyAckDeadlineOneMessage() { .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId"); } @Test public void testModifyAckDeadlineOneMessageAsync() throws ExecutionException, InterruptedException { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() .setAckDeadlineSeconds(10) .setSubscription(SUBSCRIPTION_NAME_PB) @@ -1387,7 +1485,7 @@ public void testModifyAckDeadlineOneMessageAsync() .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Future future = pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId"); assertNull(future.get()); @@ -1395,7 +1493,7 @@ public void testModifyAckDeadlineOneMessageAsync() @Test public void testModifyAckDeadlineMoreMessages() { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() .setAckDeadlineSeconds(10) .setSubscription(SUBSCRIPTION_NAME_PB) @@ -1403,14 +1501,14 @@ public void testModifyAckDeadlineMoreMessages() { .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId1", "ackId2"); } @Test public void testModifyAckDeadlineMoreMessagesAsync() throws ExecutionException, InterruptedException { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() .setAckDeadlineSeconds(10) .setSubscription(SUBSCRIPTION_NAME_PB) @@ -1418,7 +1516,7 @@ public void testModifyAckDeadlineMoreMessagesAsync() .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Future future = pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, "ackId1", "ackId2"); assertNull(future.get()); @@ -1426,7 +1524,7 @@ public void testModifyAckDeadlineMoreMessagesAsync() @Test public void testModifyAckDeadlineMessageList() { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); List ackIds = ImmutableList.of("ackId1", "ackId2"); ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() .setAckDeadlineSeconds(10) @@ -1435,14 +1533,14 @@ public void testModifyAckDeadlineMessageList() { .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); pubsub.modifyAckDeadline(SUBSCRIPTION, 10, TimeUnit.SECONDS, ackIds); } @Test public void testModifyAckDeadlineMessageListAsync() throws ExecutionException, InterruptedException { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); List ackIds = ImmutableList.of("ackId1", "ackId2"); ModifyAckDeadlineRequest request = ModifyAckDeadlineRequest.newBuilder() .setAckDeadlineSeconds(10) @@ -1451,17 +1549,22 @@ public void testModifyAckDeadlineMessageListAsync() .build(); Future response = Futures.immediateFuture(Empty.getDefaultInstance()); EasyMock.expect(pubsubRpcMock.modify(request)).andReturn(response); - EasyMock.replay(pubsubRpcMock); + EasyMock.replay(pubsubRpcMock, renewerMock); Future future = pubsub.modifyAckDeadlineAsync(SUBSCRIPTION, 10, TimeUnit.SECONDS, ackIds); assertNull(future.get()); } @Test public void testClose() throws Exception { - pubsub = options.service(); + pubsub = new PubSubImpl(options, renewerMock); pubsubRpcMock.close(); EasyMock.expectLastCall(); - EasyMock.replay(pubsubRpcMock); + EasyMock.expectLastCall(); + renewerMock.close(); + EasyMock.expectLastCall(); + EasyMock.replay(pubsubRpcMock, renewerMock); + pubsub.close(); + // closing again should do nothing pubsub.close(); } }